Big Data 12 min read

Automating Fault Recovery in 5,000‑Node Hadoop Clusters with Fabric & CM_API

This article explains how a large‑scale Hadoop environment can automatically detect common failures—such as swap usage, clock drift, agent crashes, role outages, and disk imbalance—and recover them using Prometheus alerts, Fabric/Paramiko remote execution, and Cloudera Manager APIs, complete with code examples and step‑by‑step commands.

dbaplus Community
dbaplus Community
dbaplus Community
Automating Fault Recovery in 5,000‑Node Hadoop Clusters with Fabric & CM_API

Background

In large Hadoop clusters (several thousand nodes), routine inspection and fault recovery consume a disproportionate amount of operator time. As the node count grows, incidents such as swap usage, clock drift, Cloudera Manager (CM) agent failure, role crashes, and disk saturation become increasingly frequent.

Automated self‑healing architecture

The solution ingests alert data from Prometheus, evaluates the alert type, and triggers a corresponding remediation workflow. Remote‑execution tools such as Fabric (or Ansible) establish SSH connections to the affected hosts, run the required commands, and log the outcome. The remediation scripts are scheduled to execute automatically after an alert fires, eliminating manual intervention.

Supported failure types

Swap partitions enabled on compute nodes – automatically disable and turn off swap.

Clock drift on compute nodes – synchronize the system clock.

Cloudera Manager agent process stopped – restart the agent.

Failed disks – replace the disk and bring the node back online.

Role instances (e.g., NameNode, DataNode, ResourceManager, NodeManager) reported as BAD – restart the role.

Multiple disks on a node approaching capacity – invoke HDFS DiskBalancer at the disk level.

Cluster‑wide storage usage exceeding a threshold – invoke DiskBalancer at the node level.

Implementation examples

1. Swap cleanup

def recover_HOST_MEMORY_SWAPPING(self, host_list):
    cmd = "swapoff -a"
    for host in host_list:
        conn = Connection(host['ipAddress'], port=22, user=user, connect_timeout=360,
                         forward_agent=True,
                         connect_kwargs={'password': self.password})
        conn.sudo(command=cmd, password=self.password, shell=False,
                  hide='stderr', encoding='utf-8', pty=True)
        conn.close()

2. Restart Cloudera Manager agent

def recover_HOST_SCM_HEALTH(self, host_list):
    cmd = "/opt/cm-5.13.1/etc/init.d/cloudera-scm-agent restart"
    for host in host_list:
        conn = Connection(host['ipAddress'], port=22, user=user, connect_timeout=360,
                         forward_agent=True,
                         connect_kwargs={'password': self.password})
        conn.sudo(command=cmd, password=self.password, shell=False,
                  hide='stderr', encoding='utf-8', pty=True)
        conn.close()

3. Clock offset correction

from fabric import Connection

def recover_HOST_CLOCK_OFFSET(self, host_list):
    cmds = [
        "systemctl stop ntpd",
        "ntpdate ntp_src",
        "sleep 1",
        "ntpdate ntp_src",
        "sleep 1",
        "ntpdate ntp_src",
        "sleep 1",
        "systemctl start ntpd",
        "/opt/cm-5.13.1/etc/init.d/cloudera-scm-agent restart"
    ]
    for host in host_list:
        conn = Connection(host['ipAddress'], port=22, user=user, connect_timeout=360,
                         forward_agent=True,
                         connect_kwargs={'password': self.password})
        for c in cmds:
            conn.sudo(command=c, password=self.password, shell=False,
                      hide='stderr', encoding='utf-8', pty=True)
        conn.close()

4. Role instance recovery via CM API (CDH 5.13.1)

The CM API Python client can enumerate clusters, locate roles with health summary BAD , and issue restart commands. Documentation: https://cloudera.github.io/cm_api/docs/python-client-swagger/

import cm_client
api_url = f"{api_host}:{port}/api/{api_version}"
api_client = cm_client.ApiClient(api_url)
clusters_api = cm_client.ClustersResourceApi(api_client)
clusters = clusters_api.read_clusters(view='SUMMARY')
for cluster in clusters.items:
    if cluster.full_version.startswith('5.'):
        services_api = cm_client.ServicesResourceApi(api_client)
        services = services_api.read_services(cluster.name, view='FULL')
        hdfs_service = next(s for s in services.items if s.type == 'HDFS')
        roles_api = cm_client.RolesResourceApi(api_client)
        roles = roles_api.read_roles(cluster.name, hdfs_service.name, view='FULL')
        bad_dn_roles = [r.name for r in roles.items if r.type == 'DATANODE' and r.health_summary == 'BAD']
        cmd_api = cm_client.RoleCommandsResourceApi(api_client)
        role_names = cm_client.ApiRoleNameList(bad_dn_roles)
        restart_cmds = cmd_api.restart_command(cluster.name, hdfs_service.name, body=role_names)
        for cmd in restart_cmds.items:
            print(cmd.name, "(", cmd.id, cmd.active, cmd.success, ")")

5. DiskBalancer automation (HDFS)

When a node’s disks exceed a usage threshold (e.g., 90 %), the HDFS DiskBalancer can be used to rebalance data across disks. The workflow consists of three steps: create a plan, execute the plan, and query its progress.

# Create a balancing plan (adjust bandwidth, output directory, and threshold as needed)
hdfs diskbalancer -plan lf-319-sq210.plan.json \
    --out ~/diskbalancer_$(date +%Y_%m_%d) \
    --thresholdPercentage 10

# Execute the generated plan
hdfs diskbalancer -execute /var/lib/hadoop-hdfs/diskbalancer_$(date +%Y_%m_%d)/lf-319-sq210.plan.json

# Query the status of the plan
hdfs diskbalancer -query lf-319-sq210

6. Remote execution of DiskBalancer via Fabric

def recover_DATA_NODE_FREE_SPACE_REMAINING(self, host_list):
    cmd = (
        "su hdfs -c 'whoami; export JAVA_HOME=/opt/jdk; "
        "export CLASSPATH=.:$JAVA_HOME/lib:$CLASSPATH; "
        "export PATH=$JAVA_HOME/bin:$PATH; "
        "hdfs diskbalancer -plan $HOSTNAME.plan.json "
        "--out ~/$(date +\'diskbalancer_%Y_%m_%d\') "
        "--thresholdPercentage 10; sleep 5; "
        "hdfs diskbalancer -execute /var/lib/hadoop-hdfs/$(date +\'diskbalancer_%Y_%m_%d\')/$HOSTNAME.plan.json'"
    )
    for host in host_list:
        conn = Connection(host['ipAddress'], port=22, user=user, connect_timeout=360,
                         forward_agent=True,
                         connect_kwargs={'password': self.password})
        conn.sudo(command=cmd, password=self.password, shell=False,
                  hide='stderr', encoding='utf-8', pty=True)
        conn.close()

Outcome

By automating the seven common failure scenarios, the platform reduces manual intervention, lowers operational cost, and prevents minor issues from escalating into major outages. The approach demonstrates a practical, production‑grade self‑healing mechanism for large‑scale Hadoop clusters.

HadoopFabricCluster AutomationBig Data OperationsCM_APIDiskBalancer
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.