Build an Enterprise‑Grade HDFS HA and YARN Scheduler from Scratch
This guide walks you through designing and deploying a highly available HDFS architecture with dual NameNodes, ZooKeeper‑based failover, and a tuned YARN resource scheduler, covering detailed configuration files, failover testing, performance tuning, monitoring, automated health checks, capacity planning, and best‑practice checklists for production‑grade big‑data platforms.
Introduction
Classic HDFS deployments suffer from a single‑point‑of‑failure NameNode. The article presents a production‑tested high‑availability (HA) design and YARN scheduling optimisation that raised platform availability from 99.5 % to 99.99 % and reduced annual downtime from 43 h to under one hour.
1. HDFS High‑Availability Architecture
1.1 Why the traditional HDFS design fails
The original master‑slave layout contains a single NameNode that manages the namespace and client access, making it a critical SPOF.
1.2 Full HA design
┌─────────────────┐
│ ZooKeeper │
│ Cluster (3‑5)│
└───────┬─────────┘
│
┌──────┴───────┐
│ Active NN │ Standby NN │
│ 10.0.1.10 │◄─►10.0.1.11 │
└───────┬───────┘
│
┌─────▼─────┐
│JournalNode│
│Cluster │
└─────┬─────┘
│
┌─────▼─────┐
│ DataNodes │
│ (N nodes) │
└───────────┘Core components:
Dual NameNode (Active/Standby) – real‑time metadata sync.
JournalNode quorum – ensures edit‑log durability.
ZooKeeper ensemble – monitors health and triggers automatic failover.
ZKFC process – performs the actual switch.
1.3 Practical configuration
Key XML snippets for hdfs-site.xml:
<configuration>
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>node1:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>node2:8020</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://node1:8485;node2:8485;node3:8485/mycluster</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>Key XML snippets for core-site.xml:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://mycluster</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>node1:2181,node2:2181,node3:2181</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/data/hadoop/tmp</value>
</property>
</configuration>1.4 Failover testing and tuning
Two test scenarios:
Manual transition: hdfs haadmin -transitionToStandby nn1 then hdfs haadmin -transitionToActive nn2.
Simulated failure: kill the active NameNode process (e.g., jps | grep NameNode | awk '{print $1}' | xargs kill -9) or block its RPC port with iptables, then monitor ZKFC logs ( tail -f /var/log/hadoop/hdfs/hadoop-hdfs-zkfc-*.log).
Key tuning parameters (add to hdfs-site.xml):
<property>
<name>dfs.ha.zkfc.health-monitor.rpc-timeout.ms</name>
<value>5000</value> <!-- reduce from default 45000 -->
</property>
<property>
<name>dfs.qjournal.write-txns.timeout.ms</name>
<value>60000</value> <!-- increase for unstable networks -->
</property>
<property>
<name>dfs.client.failover.max.attempts</name>
<value>15</value>
</property>
<property>
<name>dfs.client.failover.sleep.base.millis</name>
<value>500</value>
</property>2. YARN Resource Scheduling Optimisation
2.1 Core challenges
Low CPU utilisation – overall CPU usage < 40 % while users complain of insufficient resources.
Long waiting times for small jobs – short queries wait for hours behind large batch jobs.
Imbalanced queue usage – some queues idle, others overloaded.
Priority chaos – urgent jobs cannot obtain resources promptly.
2.2 Scheduler comparison
Scheduler | Suitable scenario | Advantages | Disadvantages
---------------|----------------------------------|--------------------------------|--------------------------
FIFO Scheduler | Test or single‑user env | Simple, no overhead | Low utilisation, no multi‑tenant support
Capacity Scheduler | Multi‑tenant, need isolation | Guarantees resources, elastic sharing | Complex config, needs planning
Fair Scheduler | Dynamic load, fair sharing | Auto‑balancing, flexible config | May cause resource fragmentationIn most production clusters the Capacity Scheduler is preferred (≈90 % of cases) because it provides strong isolation and guaranteed capacity.
2.3 Capacity Scheduler best practice
Example queue hierarchy for a financial services platform:
root (100%)
├─ production (70%)
│ ├─ realtime (30%) # critical low‑latency jobs
│ ├─ batch (25%) # scheduled reports
│ └─ adhoc (15%) # on‑demand queries
├─ development (20%)
│ ├─ dev‑team1 (10%)
│ └─ dev‑team2 (10%)
└─ maintenance (10%) # ops tasksCorresponding capacity-scheduler.xml (excerpt):
<configuration>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>production,development,maintenance</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.capacity</name>
<value>70</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.queues</name>
<value>realtime,batch,adhoc</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.realtime.capacity</name>
<value>30</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.realtime.maximum-capacity</name>
<value>50</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.realtime.user-limit-factor</name>
<value>2</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.realtime.priority</name>
<value>1000</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.realtime.disable-preemption</name>
<value>false</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.batch.maximum-application-lifetime</name>
<value>86400</value> <!-- 24 h -->
</property>
<property>
<name>yarn.scheduler.capacity.root.production.adhoc.maximum-applications</name>
<value>100</value>
</property>
</configuration>2.4 Advanced scheduling strategies
1. Node‑label based scheduling
# Create node labels
yarn rmadmin -addToClusterNodeLabels "GPU,SSD,MEMORY"
# Assign labels to nodes
yarn rmadmin -replaceLabelsOnNode "node1=GPU node2=GPU"
yarn rmadmin -replaceLabelsOnNode "node3=SSD node4=SSD"
yarn rmadmin -replaceLabelsOnNode "node5=MEMORY node6=MEMORY"
# Queue configuration to restrict a queue to SSD nodes
<property>
<name>yarn.scheduler.capacity.root.production.realtime.accessible-node-labels</name>
<value>SSD</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.realtime.accessible-node-labels.SSD.capacity</name>
<value>100</value>
</property>2. Dynamic resource pools
A prototype Python daemon that periodically inspects cluster load and adjusts queue capacities based on time‑of‑day and utilisation thresholds.
#!/usr/bin/env python3
import subprocess, json
from datetime import datetime
class DynamicResourceManager:
def __init__(self):
self.peak_hours = [(8,12),(14,18)] # business peak
self.off_peak_hours = [(0,6),(22,24)]
def get_current_load(self):
cmd = "yarn cluster -list"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return self.parse_cluster_metrics(result.stdout)
def adjust_queue_capacity(self, queue, capacity):
# Update capacity‑scheduler.xml (implementation omitted) then refresh RM
subprocess.run("yarn rmadmin -refreshQueues", shell=True)
def auto_scale(self):
hour = datetime.now().hour
load = self.get_current_load()
if self.is_peak_hour(hour):
if load.get('realtime',0) > 80:
self.adjust_queue_capacity('root.production.realtime', 50)
self.adjust_queue_capacity('root.production.batch', 15)
else:
self.adjust_queue_capacity('root.production.realtime', 20)
self.adjust_queue_capacity('root.production.batch', 40)
def is_peak_hour(self, hour):
return any(start <= hour < end for start,end in self.peak_hours)
if __name__ == "__main__":
DynamicResourceManager().auto_scale()3. Monitoring and Fault Diagnosis
3.1 Full‑stack monitoring layers
┌─────────────────────────────────────┐
│ Application layer (job latency, │
│ success rate) │
├─────────────────────────────────────┤
│ Service layer (NN/RM/DN/NM health)│
├─────────────────────────────────────┤
│ System layer (CPU, memory, disk) │
├─────────────────────────────────────┤
│ Infrastructure layer (rack, power)│
└─────────────────────────────────────┘3.2 Common fault playbooks
NameNode in safe mode
# Check safe mode status
hdfs dfsadmin -safemode get
# Diagnose cause
hdfs dfsadmin -report
# Common fixes
# 1. Insufficient block replicas
hdfs fsck / -blocks -files -locations
# 2. DataNode not fully started – wait a few minutes
# 3. Disk space shortage – clean logs
find /var/log/hadoop -name "*.log.*" -mtime +7 -delete
# Force exit safe mode (use with caution)
hdfs dfsadmin -safemode leaveYARN application stuck
# List applications
yarn application -list
# Inspect a specific application
yarn application -status application_1234567890_0001
# View logs
yarn logs -applicationId application_1234567890_0001
# Kill the application if needed
yarn application -kill application_1234567890_0001
# Refresh queues after termination
yarn rmadmin -refreshQueues3.3 Performance tuning
HDFS tuning parameters
<property>
<name>dfs.client.write.packet.size</name>
<value>131072</value> <!-- default 65536 -->
</property>
<property>
<name>dfs.datanode.handler.count</name>
<value>20</value> <!-- adjust to number of CPU cores -->
</property>
<property>
<name>dfs.datanode.max.transfer.threads</name>
<value>8192</value> <!-- default 4096 -->
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value> <!-- enable short‑circuit reads -->
</property>YARN tuning parameters
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>65536</value> <!-- set according to node RAM -->
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>32</value> <!-- set according to CPU cores -->
</property>
<property>
<name>yarn.resourcemanager.scheduler.client.thread-count</name>
<value>50</value> <!-- increase for high concurrency -->
</property>
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>4. Automation Toolchain
4.1 Health‑check Bash script (excerpt)
#!/bin/bash
LOG_FILE="/var/log/hdfs_health_check_$(date +%Y%m%d).log"
log_info(){ echo "[$(date '+%Y-%m-%d %H:%M:%S')] INFO: $1" | tee -a $LOG_FILE; }
log_error(){ echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $1" | tee -a $LOG_FILE; }
check_namenode(){
log_info "Checking NameNode status..."
for nn in nn1 nn2; do
state=$(hdfs haadmin -getServiceState $nn 2>/dev/null)
if [ $? -eq 0 ]; then
log_info "NameNode $nn is $state"
[ "$state" = "active" ] && ACTIVE_NN=$nn
else
log_error "Failed to get status for NameNode $nn"
fi
done
[ -z "$ACTIVE_NN" ] && log_error "No active NameNode found!"
}
check_hdfs_capacity(){
log_info "Checking HDFS capacity..."
used_percent=$(hdfs dfsadmin -report | grep "DFS Used%" | awk '{print $3}' | tr -d '%')
if (( used_percent > 85 )); then
log_error "HDFS usage critical: ${used_percent}%"
elif (( used_percent > 70 )); then
log_info "HDFS usage warning: ${used_percent}%"
else
log_info "HDFS usage normal: ${used_percent}%"
fi
}
check_corrupt_blocks(){
log_info "Checking for corrupt blocks..."
corrupt=$(hdfs dfsadmin -report | grep "Blocks with corrupt replicas" | awk '{print $5}')
if (( corrupt > 0 )); then
log_error "Found $corrupt corrupt blocks"
hdfs fsck / -delete
else
log_info "No corrupt blocks found"
fi
}
main(){
log_info "Starting HDFS health check..."
check_namenode
check_hdfs_capacity
check_corrupt_blocks
log_info "Health check completed"
}
main4.2 Capacity planning with Python (linear regression)
#!/usr/bin/env python3
import pandas as pd, numpy as np
from sklearn.linear_model import LinearRegression
from datetime import datetime, timedelta
class HDFSCapacityPredictor:
def __init__(self):
self.model = LinearRegression()
self.history_data = []
def collect_metrics(self):
"""Collect HDFS usage metrics (placeholder)."""
return {'timestamp': datetime.now(), 'used_bytes': self.get_hdfs_used(), 'total_bytes': self.get_hdfs_total()}
def train_model(self, days=90):
df = pd.DataFrame(self.history_data)
df['days'] = (df['timestamp'] - df['timestamp'].min()).dt.days
X = df[['days']].values
y = df['used_bytes'].values
self.model.fit(X, y)
growth_rate = self.model.coef_[0]
return growth_rate / (1024**3) # GB per day
def predict_capacity_exhaustion(self):
current_used = self.get_hdfs_used()
total = self.get_hdfs_total()
daily_gb = self.train_model()
remaining = total - current_used
days = remaining / (daily_gb * 1024**3)
return {'days_remaining': int(days), 'exhaustion_date': (datetime.now()+timedelta(days=days)).strftime('%Y-%m-%d'), 'daily_growth_gb': daily_gb}
def get_hdfs_used(self):
return 0 # replace with real metric collection
def get_hdfs_total(self):
return 0
if __name__ == "__main__":
p = HDFSCapacityPredictor()
result = p.predict_capacity_exhaustion()
print(f"Daily growth: {result['daily_growth_gb']:.2f} GB")
print(f"Days remaining: {result['days_remaining']}")
print(f"Exhaustion date: {result['exhaustion_date']}")5. Production Best‑Practice Summary
5.1 Architectural principles
High‑availability first – sacrifice some performance to guarantee service continuity.
Monitoring first – a system without monitoring is a naked run.
Automation – eliminate manual steps to reduce human error.
Capacity redundancy – keep at least 30 % spare resources.
Canary releases – validate every change in a test environment before production.
5.2 Daily operational checklist
NameNode active/standby status normal.
HDFS usage below 80 %.
No corrupt or missing blocks.
All DataNodes online.
YARN ResourceManager healthy.
Queue resource usage balanced.
No long‑pending tasks.
System logs free of critical errors.
Recent backups available.
5.3 RTO/RPO targets
Fault Type | RTO (target) | RPO (target) | Implementation
-------------------|--------------|--------------|----------------------------
NameNode failure | < 1 minute | 0 | HA automatic failover
DataNode failure | < 10 minutes | 0 | Automatic replica recovery
Rack failure | < 30 minutes | 0 | Rack‑aware replica placement
Datacenter failure | < 4 hours | < 1 hour | Cross‑datacenter disaster recoveryRaymond Ops
Linux ops automation, cloud-native, Kubernetes, SRE, DevOps, Python, Golang and related tech discussions.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
