Operations 28 min read

Master Kafka Production: High‑Availability Cluster Deployment & Ops Best Practices

This comprehensive guide walks operations engineers through designing, deploying, and managing a high‑availability Kafka production cluster, covering automated ZooKeeper and Kafka installation scripts, performance tuning for producers and consumers, monitoring with Prometheus and Grafana, and automated health checks and recovery procedures.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
Master Kafka Production: High‑Availability Cluster Deployment & Ops Best Practices

Kafka Production Environment Solution: High‑Availability Cluster Deployment and Ops Practice

Architecture Diagram

┌─────────────────────────────────────────────────────────────────────────────────┐
│                     Kafka Production Architecture                         │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│  ┌─────────────┐   ┌─────────────┐   ┌─────────────┐                         │
│  │  Producer1  │   │  Producer2  │   │  Producer3  │                         │
│  └─────────────┘   └─────────────┘   └─────────────┘                         │
│          │               │               │                                 │
│          └─────────────────┼─────────────────┘                                 │
│                           │                                               │
│  ┌─────────────────────────────────────────────────────────────────────────┐ │
│  │                     Kafka Cluster                                   │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │ │
│  │  │   Broker1   │  │   Broker2   │  │   Broker3   │  │   Broker4   │ │ │
│  │  │192.168.1.11 │  │192.168.1.12 │  │192.168.1.13 │  │192.168.1.14 │ │ │
│  │  │   Port:9092 │  │   Port:9092 │  │   Port:9092 │  │   Port:9092 │ │ │
│  │  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘ │ │
│  └─────────────────────────────────────────────────────────────────────────┘ │
│                           │                                               │
│  ┌─────────────────────────────────────────────────────────────────────────┐ │
│  │                     ZooKeeper Cluster                                 │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                │ │
│  │  │     ZK1     │  │     ZK2     │  │     ZK3     │                │ │
│  │  │192.168.1.21 │  │192.168.1.22 │  │192.168.1.23 │                │ │
│  │  │  Port:2181  │  │  Port:2181  │  │  Port:2181  │                │ │
│  │  └─────────────┘  └─────────────┘  └─────────────┘                │ │
│  └─────────────────────────────────────────────────────────────────────────┘ │
│                           │                                               │
│  ┌─────────────┐   ┌─────────────┐   ┌─────────────┐                         │
│  │  Consumer1  │   │  Consumer2  │   │  Consumer3  │                         │
│  │ (Group A)   │   │ (Group B)   │   │ (Group C)   │                         │
│  └─────────────┘   └─────────────┘   └─────────────┘                         │
│                                                                                 │
│  ┌─────────────────────────────────────────────────────────────────────────┐ │
│  │                     Monitoring System                                 │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                │ │
│  │  │ Prometheus │  │   Grafana   │  │   Kafka     │                │ │
│  │  │   Metrics   │  │ Dashboard   │  │   Manager   │                │ │
│  │  └─────────────┘  └─────────────┘  └─────────────┘                │ │
│  └─────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘

Introduction

Apache Kafka, as a distributed streaming platform, plays a core role as a message middleware in modern big‑data architectures. This article presents, from an operations engineer’s perspective, detailed deployment schemes, configuration optimizations, and monitoring practices for Kafka in production environments, supplemented with practical examples and code snippets to help teams build stable and efficient Kafka clusters.

1. Automated Kafka Cluster Deployment

1.1 ZooKeeper Cluster Deployment Script

#!/bin/bash
# ZooKeeper cluster automated deployment script
set -e

ZK_VERSION="3.8.1"
ZK_NODES=("192.168.1.21" "192.168.1.22" "192.168.1.23")
ZK_DATA_DIR="/data/zookeeper"
ZK_LOG_DIR="/logs/zookeeper"

# Create ZooKeeper user
useradd -r -s /bin/false zookeeper

install_zookeeper() {
    cd /tmp
    wget https://archive.apache.org/dist/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz
    tar -xzf apache-zookeeper-${ZK_VERSION}-bin.tar.gz
    mv apache-zookeeper-${ZK_VERSION}-bin /opt/zookeeper
    chown -R zookeeper:zookeeper /opt/zookeeper
}

configure_zookeeper() {
    local node_id=$1
    local node_ip=$2
    mkdir -p ${ZK_DATA_DIR} ${ZK_LOG_DIR}
    chown -R zookeeper:zookeeper ${ZK_DATA_DIR} ${ZK_LOG_DIR}
    echo ${node_id} > ${ZK_DATA_DIR}/myid
    cat > /opt/zookeeper/conf/zoo.cfg <<EOF
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=${ZK_DATA_DIR}
    dataLogDir=${ZK_LOG_DIR}
    clientPort=2181
    maxClientCnxns=60
    server.1=192.168.1.21:2888:3888
    server.2=192.168.1.22:2888:3888
    server.3=192.168.1.23:2888:3888
    autopurge.snapRetainCount=10
    autopurge.purgeInterval=1
EOF
}

start_zookeeper() {
    cat > /etc/systemd/system/zookeeper.service <<EOF
    [Unit]
    Description=Apache ZooKeeper server
    Documentation=http://zookeeper.apache.org
    Requires=network.target remote-fs.target
    After=network.target remote-fs.target

    [Service]
    Type=forking
    User=zookeeper
    Group=zookeeper
    Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
    ExecStart=/opt/zookeeper/bin/zkServer.sh start
    ExecStop=/opt/zookeeper/bin/zkServer.sh stop
    ExecReload=/opt/zookeeper/bin/zkServer.sh restart
    TimeoutSec=30
    Restart=on-failure

    [Install]
    WantedBy=multi-user.target
EOF
    systemctl daemon-reload
    systemctl enable zookeeper
    systemctl start zookeeper
}

install_zookeeper
configure_zookeeper $1 $2
start_zookeeper

ZooKeeper, as Kafka’s coordination service, requires an odd number of nodes to ensure high availability. Automated scripts enable rapid, standardized deployment of a ZooKeeper environment.

1.2 Kafka Cluster Deployment Configuration

#!/bin/bash
# Kafka cluster deployment script
KAFKA_VERSION="2.8.2"
KAFKA_NODES=("192.168.1.11" "192.168.1.12" "192.168.1.13" "192.168.1.14")
KAFKA_DATA_DIR="/data/kafka"
KAFKA_LOG_DIR="/logs/kafka"

install_kafka() {
    cd /tmp
    wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-${KAFKA_VERSION}.tgz
    tar -xzf kafka_2.13-${KAFKA_VERSION}.tgz
    mv kafka_2.13-${KAFKA_VERSION} /opt/kafka
    useradd -r -s /bin/false kafka
    chown -R kafka:kafka /opt/kafka
}

mkdir -p ${KAFKA_DATA_DIR} ${KAFKA_LOG_DIR}
chown -R kafka:kafka ${KAFKA_DATA_DIR} ${KAFKA_LOG_DIR}

generate_kafka_config() {
    local broker_id=$1
    local node_ip=$2
    cat > /opt/kafka/config/server.properties <<EOF
    broker.id=${broker_id}
    listeners=PLAINTEXT://${node_ip}:9092
    advertised.listeners=PLAINTEXT://${node_ip}:9092
    num.network.threads=8
    num.io.threads=16
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=${KAFKA_DATA_DIR}
    num.partitions=3
    num.recovery.threads.per.data.dir=2
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.1.21:2181,192.168.1.22:2181,192.168.1.23:2181/kafka
    zookeeper.connection.timeout.ms=18000
    replica.fetch.max.bytes=1048576
    message.max.bytes=1000000
    replica.socket.timeout.ms=30000
    replica.socket.receive.buffer.bytes=65536
    replica.fetch.wait.max.ms=500
    replica.high.watermark.checkpoint.interval.ms=5000
    fetch.purgatory.purge.interval.requests=1000
    producer.purgatory.purge.interval.requests=1000
    delete.topic.enable=true
    export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
    export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
EOF
}

create_kafka_service() {
    cat > /etc/systemd/system/kafka.service <<EOF
    [Unit]
    Description=Apache Kafka server (broker)
    Documentation=http://kafka.apache.org/documentation.html
    Requires=network.target remote-fs.target
    After=network.target remote-fs.target zookeeper.service

    [Service]
    Type=simple
    User=kafka
    Group=kafka
    Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
    ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
    ExecStop=/opt/kafka/bin/kafka-server-stop.sh
    TimeoutSec=30
    Restart=on-failure

    [Install]
    WantedBy=multi-user.target
EOF
    systemctl daemon-reload
    systemctl enable kafka
    systemctl start kafka
}

install_kafka
generate_kafka_config $1 $2
create_kafka_service

2. Production Environment Performance Optimization

2.1 Producer Performance Tuning

#!/usr/bin/env python3
# Kafka producer performance optimization configuration
from kafka import KafkaProducer
import json, time, threading
from concurrent.futures import ThreadPoolExecutor

class OptimizedKafkaProducer:
    def __init__(self, bootstrap_servers, topic):
        self.topic = topic
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            batch_size=16384,
            linger_ms=10,
            buffer_memory=33554432,
            compression_type='snappy',
            max_in_flight_requests_per_connection=5,
            retries=3,
            retry_backoff_ms=100,
            request_timeout_ms=30000,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: str(k).encode('utf-8')
        )

    def send_message_sync(self, key, value):
        """Synchronously send a message"""
        try:
            future = self.producer.send(self.topic, key=key, value=value)
            record_metadata = future.get(timeout=10)
            return {'topic': record_metadata.topic, 'partition': record_metadata.partition, 'offset': record_metadata.offset}
        except Exception as e:
            print(f"Send failed: {e}")
            return None

    def send_message_async(self, key, value, callback=None):
        """Asynchronously send a message"""
        try:
            future = self.producer.send(self.topic, key=key, value=value)
            if callback:
                future.add_callback(callback)
            return future
        except Exception as e:
            print(f"Send failed: {e}")
            return None

    def batch_send_performance_test(self, message_count=100000):
        """Batch send performance test"""
        start_time = time.time()
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = []
            for i in range(message_count):
                message = {'id': i, 'timestamp': time.time(), 'data': f'test_message_{i}', 'source': 'performance_test'}
                future = executor.submit(self.send_message_async, str(i), message)
                futures.append(future)
            for future in futures:
                try:
                    future.result(timeout=30)
                except Exception as e:
                    print(f"Message send exception: {e}")
        self.producer.flush()
        duration = time.time() - start_time
        throughput = message_count / duration
        print(f"Sent {message_count} messages")
        print(f"Total time: {duration:.2f} seconds")
        print(f"Throughput: {throughput:.2f} messages/second")

    def close(self):
        self.producer.close()

if __name__ == "__main__":
    producer = OptimizedKafkaProducer(
        bootstrap_servers=['192.168.1.11:9092', '192.168.1.12:9092'],
        topic='performance_test'
    )
    producer.batch_send_performance_test(50000)
    producer.close()

2.2 Consumer Performance Optimization

#!/usr/bin/env python3
# Kafka consumer performance optimization configuration
from kafka import KafkaConsumer
import json, time, threading
from concurrent.futures import ThreadPoolExecutor

class OptimizedKafkaConsumer:
    def __init__(self, topics, group_id, bootstrap_servers):
        self.topics = topics
        self.group_id = group_id
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            fetch_min_bytes=1024,
            fetch_max_wait_ms=500,
            max_poll_records=500,
            max_poll_interval_ms=300000,
            session_timeout_ms=30000,
            heartbeat_interval_ms=10000,
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None
        )

    def consume_messages_batch(self, batch_size=100, timeout=5000):
        """Batch consume messages"""
        message_batch = []
        try:
            message_pack = self.consumer.poll(timeout_ms=timeout)
            for topic_partition, messages in message_pack.items():
                for message in messages:
                    message_batch.append({
                        'topic': message.topic,
                        'partition': message.partition,
                        'offset': message.offset,
                        'key': message.key,
                        'value': message.value,
                        'timestamp': message.timestamp
                    })
                    if len(message_batch) >= batch_size:
                        self.process_message_batch(message_batch)
                        message_batch = []
            if message_batch:
                self.process_message_batch(message_batch)
            self.consumer.commit()
        except Exception as e:
            print(f"Consume exception: {e}")

    def process_message_batch(self, messages):
        """Process a batch of messages"""
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(self.process_single_message, msg) for msg in messages]
            for future in futures:
                try:
                    future.result(timeout=30)
                except Exception as e:
                    print(f"Process exception: {e}")

    def process_single_message(self, message):
        """Process a single message"""
        try:
            time.sleep(0.001)  # simulate business logic
            print(f"Processing message: Topic={message['topic']}, Partition={message['partition']}, Offset={message['offset']}")
        except Exception as e:
            print(f"Single message exception: {e}")

    def start_consuming(self):
        """Start consuming messages"""
        print(f"Starting consumption for topics: {self.topics}")
        try:
            while True:
                self.consume_messages_batch()
        except KeyboardInterrupt:
            print("Stopping consumption")
        finally:
            self.consumer.close()

if __name__ == "__main__":
    consumer = OptimizedKafkaConsumer(
        topics=['performance_test'],
        group_id='performance_consumer_group',
        bootstrap_servers=['192.168.1.11:9092', '192.168.1.12:9092']
    )
    consumer.start_consuming()

3. Monitoring and Automated Operations

3.1 Kafka Cluster Monitoring Script

#!/bin/bash
# Kafka cluster monitoring script
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
ALERT_EMAIL="[email protected]"
LOG_FILE="/var/log/kafka_monitor.log"

check_kafka_cluster() {
    echo "$(date): Checking Kafka cluster status" >> $LOG_FILE
    broker_list=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server $KAFKA_BROKERS 2>/dev/null | grep -c "id:")
    if [ "$broker_list" -lt 3 ]; then
        echo "ALERT: Insufficient Kafka brokers: $broker_list" | mail -s "Kafka Cluster Alert" $ALERT_EMAIL
        echo "$(date): ALERT - Insufficient brokers: $broker_list" >> $LOG_FILE
    fi
}

check_topic_health() {
    echo "$(date): Checking topic health" >> $LOG_FILE
    topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --list)
    for topic in $topics; do
        topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --describe --topic $topic)
        offline_partitions=$(echo "$topic_desc" | grep -c "Leader: -1")
        if [ "$offline_partitions" -gt 0 ]; then
            echo "ALERT: Topic $topic has $offline_partitions offline partitions" | mail -s "Kafka Topic Alert" $ALERT_EMAIL
            echo "$(date): ALERT - Topic $topic offline partitions: $offline_partitions" >> $LOG_FILE
        fi
    done
}

check_consumer_lag() {
    echo "$(date): Checking consumer group lag" >> $LOG_FILE
    consumer_groups=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA_BROKERS --list)
    for group in $consumer_groups; do
        group_desc=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA_BROKERS --describe --group $group)
        max_lag=$(echo "$group_desc" | awk 'NR>1 {print $5}' | grep -v "-" | sort -n | tail -1)
        if [ -n "$max_lag" ] && [ "$max_lag" -gt 10000 ]; then
            echo "ALERT: Consumer group $group max lag: $max_lag" | mail -s "Kafka Consumer Lag Alert" $ALERT_EMAIL
            echo "$(date): ALERT - Consumer group $group lag too high: $max_lag" >> $LOG_FILE
        fi
    done
}

collect_metrics() {
    echo "$(date): Collecting Kafka performance metrics" >> $LOG_FILE
    for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do
        kafka_pid=$(ssh $broker "pgrep -f kafka")
        if [ -n "$kafka_pid" ]; then
            memory_usage=$(ssh $broker "ps -p $kafka_pid -o %mem --no-headers")
            echo "$(date): Broker $broker memory usage: $memory_usage%" >> $LOG_FILE
            cpu_usage=$(ssh $broker "ps -p $kafka_pid -o %cpu --no-headers")
            echo "$(date): Broker $broker CPU usage: $cpu_usage%" >> $LOG_FILE
        fi
    done
}

while true; do
    check_kafka_cluster
    check_topic_health
    check_consumer_lag
    collect_metrics
    sleep 300
done

3.2 Automated Operations Scripts

#!/usr/bin/env python3
# Kafka automated operations script
import subprocess, json, smtplib
from email.mime.text import MIMEText
from datetime import datetime
import logging

class KafkaOperations:
    def __init__(self, kafka_home, brokers):
        self.kafka_home = kafka_home
        self.brokers = brokers
        self.logger = self.setup_logger()

    def setup_logger(self):
        """Set up logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/kafka_operations.log'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)

    def create_topic(self, topic_name, partitions=3, replication_factor=2):
        """Create a Kafka topic"""
        try:
            cmd = [
                f"{self.kafka_home}/bin/kafka-topics.sh",
                "--bootstrap-server", self.brokers,
                "--create",
                "--topic", topic_name,
                "--partitions", str(partitions),
                "--replication-factor", str(replication_factor)
            ]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                self.logger.info(f"Successfully created topic: {topic_name}")
                return True
            else:
                self.logger.error(f"Failed to create topic: {result.stderr}")
                return False
        except Exception as e:
            self.logger.error(f"Create topic exception: {e}")
            return False

    def delete_topic(self, topic_name):
        """Delete a Kafka topic"""
        try:
            cmd = [
                f"{self.kafka_home}/bin/kafka-topics.sh",
                "--bootstrap-server", self.brokers,
                "--delete",
                "--topic", topic_name
            ]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                self.logger.info(f"Successfully deleted topic: {topic_name}")
                return True
            else:
                self.logger.error(f"Failed to delete topic: {result.stderr}")
                return False
        except Exception as e:
            self.logger.error(f"Delete topic exception: {e}")
            return False

    def increase_partitions(self, topic_name, new_partition_count):
        """Increase partition count for a topic"""
        try:
            cmd = [
                f"{self.kafka_home}/bin/kafka-topics.sh",
                "--bootstrap-server", self.brokers,
                "--alter",
                "--topic", topic_name,
                "--partitions", str(new_partition_count)
            ]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                self.logger.info(f"Successfully increased partitions for {topic_name} to {new_partition_count}")
                return True
            else:
                self.logger.error(f"Failed to increase partitions: {result.stderr}")
                return False
        except Exception as e:
            self.logger.error(f"Increase partitions exception: {e}")
            return False

    def rebalance_partitions(self, topic_name):
        """Rebalance partitions for a topic"""
        try:
            reassignment_file = f"/tmp/reassignment-{topic_name}.json"
            cmd_current = [
                f"{self.kafka_home}/bin/kafka-topics.sh",
                "--bootstrap-server", self.brokers,
                "--describe",
                "--topic", topic_name
            ]
            current_result = subprocess.run(cmd_current, capture_output=True, text=True)
            if current_result.returncode == 0:
                cmd_generate = [
                    f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
                    "--bootstrap-server", self.brokers,
                    "--topics-to-move-json-file", "/tmp/topics.json",
                    "--broker-list", "0,1,2,3",
                    "--generate"
                ]
                cmd_execute = [
                    f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
                    "--bootstrap-server", self.brokers,
                    "--reassignment-json-file", reassignment_file,
                    "--execute"
                ]
                self.logger.info(f"Starting rebalance for topic: {topic_name}")
                return True
            else:
                self.logger.error(f"Failed to get topic info: {current_result.stderr}")
                return False
        except Exception as e:
            self.logger.error(f"Rebalance exception: {e}")
            return False

    def backup_consumer_offsets(self, group_id):
        """Backup consumer group offsets"""
        try:
            cmd = [
                f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
                "--bootstrap-server", self.brokers,
                "--describe",
                "--group", group_id
            ]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                backup_file = f"/backup/consumer_offsets_{group_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
                with open(backup_file, 'w') as f:
                    f.write(result.stdout)
                self.logger.info(f"Successfully backed up offsets for group {group_id} to {backup_file}")
                return True
            else:
                self.logger.error(f"Backup offsets failed: {result.stderr}")
                return False
        except Exception as e:
            self.logger.error(f"Backup offsets exception: {e}")
            return False

if __name__ == "__main__":
    kafka_ops = KafkaOperations(kafka_home="/opt/kafka", brokers="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092")
    kafka_ops.create_topic("test_topic", partitions=6, replication_factor=3)
    kafka_ops.increase_partitions("test_topic", 12)
    kafka_ops.backup_consumer_offsets("test_consumer_group")

4. High Availability and Fault Recovery

4.1 Cluster Health Check

#!/bin/bash
# Kafka cluster health check and auto-recovery
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"

check_and_fix_isr() {
    echo "Checking ISR issues..."
    topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --list)
    for topic in $topics; do
        topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --describe --topic $topic)
        isr_issues=$(echo "$topic_desc" | grep -E "Isr:|Replicas:" | awk '{
            if ($1 == "Replicas:") replicas = NF-1;
            if ($1 == "Isr:") isr = NF-1;
            if (isr < replicas) print "ISR不足";
        }')
        if [ -n "$isr_issues" ]; then
            echo "Topic $topic has ISR shortage, attempting fix..."
            ${KAFKA_HOME}/bin/kafka-leader-election.sh --bootstrap-server $KAFKA_BROKERS --election-type preferred --topic $topic
        fi
    done
}

auto_recovery() {
    echo "Executing automatic fault recovery..."
    for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do
        if ! ssh $broker "systemctl is-active kafka" > /dev/null 2>&1; then
            echo "Restarting broker: $broker"
            ssh $broker "systemctl restart kafka"
            sleep 30
        fi
    done
    check_and_fix_isr
    validate_cluster_state
}

validate_cluster_state() {
    echo "Validating cluster state..."
    online_brokers=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server $KAFKA_BROKERS 2>/dev/null | grep -c "id:")
    if [ "$online_brokers" -eq 3 ]; then
        echo "Cluster is healthy, all brokers online"
    else
        echo "Cluster recovery failed, online brokers: $online_brokers"
        return 1
    fi
}

auto_recovery

Conclusion

Deploying Kafka in a production environment involves multiple critical aspects: cluster architecture design, performance parameter tuning, monitoring system construction, and automated operational procedures. By following the solutions presented, operations engineers can build a stable, high‑efficiency Kafka cluster, tailoring optimizations to specific business scenarios and continuously monitoring and improving system performance to ensure reliable message‑queue services.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

high availabilityproduction deployment
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.