Master Kafka Production: High‑Availability Cluster Deployment & Ops Best Practices
This comprehensive guide walks operations engineers through designing, deploying, and managing a high‑availability Kafka production cluster, covering automated ZooKeeper and Kafka installation scripts, performance tuning for producers and consumers, monitoring with Prometheus and Grafana, and automated health checks and recovery procedures.
Kafka Production Environment Solution: High‑Availability Cluster Deployment and Ops Practice
Architecture Diagram
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Kafka Production Architecture │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Producer1 │ │ Producer2 │ │ Producer3 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Kafka Cluster │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Broker1 │ │ Broker2 │ │ Broker3 │ │ Broker4 │ │ │
│ │ │192.168.1.11 │ │192.168.1.12 │ │192.168.1.13 │ │192.168.1.14 │ │ │
│ │ │ Port:9092 │ │ Port:9092 │ │ Port:9092 │ │ Port:9092 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ ZooKeeper Cluster │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ ZK1 │ │ ZK2 │ │ ZK3 │ │ │
│ │ │192.168.1.21 │ │192.168.1.22 │ │192.168.1.23 │ │ │
│ │ │ Port:2181 │ │ Port:2181 │ │ Port:2181 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Consumer1 │ │ Consumer2 │ │ Consumer3 │ │
│ │ (Group A) │ │ (Group B) │ │ (Group C) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Monitoring System │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Prometheus │ │ Grafana │ │ Kafka │ │ │
│ │ │ Metrics │ │ Dashboard │ │ Manager │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘Introduction
Apache Kafka, as a distributed streaming platform, plays a core role as a message middleware in modern big‑data architectures. This article presents, from an operations engineer’s perspective, detailed deployment schemes, configuration optimizations, and monitoring practices for Kafka in production environments, supplemented with practical examples and code snippets to help teams build stable and efficient Kafka clusters.
1. Automated Kafka Cluster Deployment
1.1 ZooKeeper Cluster Deployment Script
#!/bin/bash
# ZooKeeper cluster automated deployment script
set -e
ZK_VERSION="3.8.1"
ZK_NODES=("192.168.1.21" "192.168.1.22" "192.168.1.23")
ZK_DATA_DIR="/data/zookeeper"
ZK_LOG_DIR="/logs/zookeeper"
# Create ZooKeeper user
useradd -r -s /bin/false zookeeper
install_zookeeper() {
cd /tmp
wget https://archive.apache.org/dist/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz
tar -xzf apache-zookeeper-${ZK_VERSION}-bin.tar.gz
mv apache-zookeeper-${ZK_VERSION}-bin /opt/zookeeper
chown -R zookeeper:zookeeper /opt/zookeeper
}
configure_zookeeper() {
local node_id=$1
local node_ip=$2
mkdir -p ${ZK_DATA_DIR} ${ZK_LOG_DIR}
chown -R zookeeper:zookeeper ${ZK_DATA_DIR} ${ZK_LOG_DIR}
echo ${node_id} > ${ZK_DATA_DIR}/myid
cat > /opt/zookeeper/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=${ZK_DATA_DIR}
dataLogDir=${ZK_LOG_DIR}
clientPort=2181
maxClientCnxns=60
server.1=192.168.1.21:2888:3888
server.2=192.168.1.22:2888:3888
server.3=192.168.1.23:2888:3888
autopurge.snapRetainCount=10
autopurge.purgeInterval=1
EOF
}
start_zookeeper() {
cat > /etc/systemd/system/zookeeper.service <<EOF
[Unit]
Description=Apache ZooKeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=forking
User=zookeeper
Group=zookeeper
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
TimeoutSec=30
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable zookeeper
systemctl start zookeeper
}
install_zookeeper
configure_zookeeper $1 $2
start_zookeeperZooKeeper, as Kafka’s coordination service, requires an odd number of nodes to ensure high availability. Automated scripts enable rapid, standardized deployment of a ZooKeeper environment.
1.2 Kafka Cluster Deployment Configuration
#!/bin/bash
# Kafka cluster deployment script
KAFKA_VERSION="2.8.2"
KAFKA_NODES=("192.168.1.11" "192.168.1.12" "192.168.1.13" "192.168.1.14")
KAFKA_DATA_DIR="/data/kafka"
KAFKA_LOG_DIR="/logs/kafka"
install_kafka() {
cd /tmp
wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-${KAFKA_VERSION}.tgz
tar -xzf kafka_2.13-${KAFKA_VERSION}.tgz
mv kafka_2.13-${KAFKA_VERSION} /opt/kafka
useradd -r -s /bin/false kafka
chown -R kafka:kafka /opt/kafka
}
mkdir -p ${KAFKA_DATA_DIR} ${KAFKA_LOG_DIR}
chown -R kafka:kafka ${KAFKA_DATA_DIR} ${KAFKA_LOG_DIR}
generate_kafka_config() {
local broker_id=$1
local node_ip=$2
cat > /opt/kafka/config/server.properties <<EOF
broker.id=${broker_id}
listeners=PLAINTEXT://${node_ip}:9092
advertised.listeners=PLAINTEXT://${node_ip}:9092
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=${KAFKA_DATA_DIR}
num.partitions=3
num.recovery.threads.per.data.dir=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.21:2181,192.168.1.22:2181,192.168.1.23:2181/kafka
zookeeper.connection.timeout.ms=18000
replica.fetch.max.bytes=1048576
message.max.bytes=1000000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
fetch.purgatory.purge.interval.requests=1000
producer.purgatory.purge.interval.requests=1000
delete.topic.enable=true
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
EOF
}
create_kafka_service() {
cat > /etc/systemd/system/kafka.service <<EOF
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service
[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
}
install_kafka
generate_kafka_config $1 $2
create_kafka_service2. Production Environment Performance Optimization
2.1 Producer Performance Tuning
#!/usr/bin/env python3
# Kafka producer performance optimization configuration
from kafka import KafkaProducer
import json, time, threading
from concurrent.futures import ThreadPoolExecutor
class OptimizedKafkaProducer:
def __init__(self, bootstrap_servers, topic):
self.topic = topic
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
batch_size=16384,
linger_ms=10,
buffer_memory=33554432,
compression_type='snappy',
max_in_flight_requests_per_connection=5,
retries=3,
retry_backoff_ms=100,
request_timeout_ms=30000,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8')
)
def send_message_sync(self, key, value):
"""Synchronously send a message"""
try:
future = self.producer.send(self.topic, key=key, value=value)
record_metadata = future.get(timeout=10)
return {'topic': record_metadata.topic, 'partition': record_metadata.partition, 'offset': record_metadata.offset}
except Exception as e:
print(f"Send failed: {e}")
return None
def send_message_async(self, key, value, callback=None):
"""Asynchronously send a message"""
try:
future = self.producer.send(self.topic, key=key, value=value)
if callback:
future.add_callback(callback)
return future
except Exception as e:
print(f"Send failed: {e}")
return None
def batch_send_performance_test(self, message_count=100000):
"""Batch send performance test"""
start_time = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for i in range(message_count):
message = {'id': i, 'timestamp': time.time(), 'data': f'test_message_{i}', 'source': 'performance_test'}
future = executor.submit(self.send_message_async, str(i), message)
futures.append(future)
for future in futures:
try:
future.result(timeout=30)
except Exception as e:
print(f"Message send exception: {e}")
self.producer.flush()
duration = time.time() - start_time
throughput = message_count / duration
print(f"Sent {message_count} messages")
print(f"Total time: {duration:.2f} seconds")
print(f"Throughput: {throughput:.2f} messages/second")
def close(self):
self.producer.close()
if __name__ == "__main__":
producer = OptimizedKafkaProducer(
bootstrap_servers=['192.168.1.11:9092', '192.168.1.12:9092'],
topic='performance_test'
)
producer.batch_send_performance_test(50000)
producer.close()2.2 Consumer Performance Optimization
#!/usr/bin/env python3
# Kafka consumer performance optimization configuration
from kafka import KafkaConsumer
import json, time, threading
from concurrent.futures import ThreadPoolExecutor
class OptimizedKafkaConsumer:
def __init__(self, topics, group_id, bootstrap_servers):
self.topics = topics
self.group_id = group_id
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
fetch_min_bytes=1024,
fetch_max_wait_ms=500,
max_poll_records=500,
max_poll_interval_ms=300000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None
)
def consume_messages_batch(self, batch_size=100, timeout=5000):
"""Batch consume messages"""
message_batch = []
try:
message_pack = self.consumer.poll(timeout_ms=timeout)
for topic_partition, messages in message_pack.items():
for message in messages:
message_batch.append({
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'key': message.key,
'value': message.value,
'timestamp': message.timestamp
})
if len(message_batch) >= batch_size:
self.process_message_batch(message_batch)
message_batch = []
if message_batch:
self.process_message_batch(message_batch)
self.consumer.commit()
except Exception as e:
print(f"Consume exception: {e}")
def process_message_batch(self, messages):
"""Process a batch of messages"""
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(self.process_single_message, msg) for msg in messages]
for future in futures:
try:
future.result(timeout=30)
except Exception as e:
print(f"Process exception: {e}")
def process_single_message(self, message):
"""Process a single message"""
try:
time.sleep(0.001) # simulate business logic
print(f"Processing message: Topic={message['topic']}, Partition={message['partition']}, Offset={message['offset']}")
except Exception as e:
print(f"Single message exception: {e}")
def start_consuming(self):
"""Start consuming messages"""
print(f"Starting consumption for topics: {self.topics}")
try:
while True:
self.consume_messages_batch()
except KeyboardInterrupt:
print("Stopping consumption")
finally:
self.consumer.close()
if __name__ == "__main__":
consumer = OptimizedKafkaConsumer(
topics=['performance_test'],
group_id='performance_consumer_group',
bootstrap_servers=['192.168.1.11:9092', '192.168.1.12:9092']
)
consumer.start_consuming()3. Monitoring and Automated Operations
3.1 Kafka Cluster Monitoring Script
#!/bin/bash
# Kafka cluster monitoring script
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
ALERT_EMAIL="[email protected]"
LOG_FILE="/var/log/kafka_monitor.log"
check_kafka_cluster() {
echo "$(date): Checking Kafka cluster status" >> $LOG_FILE
broker_list=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server $KAFKA_BROKERS 2>/dev/null | grep -c "id:")
if [ "$broker_list" -lt 3 ]; then
echo "ALERT: Insufficient Kafka brokers: $broker_list" | mail -s "Kafka Cluster Alert" $ALERT_EMAIL
echo "$(date): ALERT - Insufficient brokers: $broker_list" >> $LOG_FILE
fi
}
check_topic_health() {
echo "$(date): Checking topic health" >> $LOG_FILE
topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --list)
for topic in $topics; do
topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --describe --topic $topic)
offline_partitions=$(echo "$topic_desc" | grep -c "Leader: -1")
if [ "$offline_partitions" -gt 0 ]; then
echo "ALERT: Topic $topic has $offline_partitions offline partitions" | mail -s "Kafka Topic Alert" $ALERT_EMAIL
echo "$(date): ALERT - Topic $topic offline partitions: $offline_partitions" >> $LOG_FILE
fi
done
}
check_consumer_lag() {
echo "$(date): Checking consumer group lag" >> $LOG_FILE
consumer_groups=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA_BROKERS --list)
for group in $consumer_groups; do
group_desc=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA_BROKERS --describe --group $group)
max_lag=$(echo "$group_desc" | awk 'NR>1 {print $5}' | grep -v "-" | sort -n | tail -1)
if [ -n "$max_lag" ] && [ "$max_lag" -gt 10000 ]; then
echo "ALERT: Consumer group $group max lag: $max_lag" | mail -s "Kafka Consumer Lag Alert" $ALERT_EMAIL
echo "$(date): ALERT - Consumer group $group lag too high: $max_lag" >> $LOG_FILE
fi
done
}
collect_metrics() {
echo "$(date): Collecting Kafka performance metrics" >> $LOG_FILE
for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do
kafka_pid=$(ssh $broker "pgrep -f kafka")
if [ -n "$kafka_pid" ]; then
memory_usage=$(ssh $broker "ps -p $kafka_pid -o %mem --no-headers")
echo "$(date): Broker $broker memory usage: $memory_usage%" >> $LOG_FILE
cpu_usage=$(ssh $broker "ps -p $kafka_pid -o %cpu --no-headers")
echo "$(date): Broker $broker CPU usage: $cpu_usage%" >> $LOG_FILE
fi
done
}
while true; do
check_kafka_cluster
check_topic_health
check_consumer_lag
collect_metrics
sleep 300
done3.2 Automated Operations Scripts
#!/usr/bin/env python3
# Kafka automated operations script
import subprocess, json, smtplib
from email.mime.text import MIMEText
from datetime import datetime
import logging
class KafkaOperations:
def __init__(self, kafka_home, brokers):
self.kafka_home = kafka_home
self.brokers = brokers
self.logger = self.setup_logger()
def setup_logger(self):
"""Set up logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/kafka_operations.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
def create_topic(self, topic_name, partitions=3, replication_factor=2):
"""Create a Kafka topic"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.brokers,
"--create",
"--topic", topic_name,
"--partitions", str(partitions),
"--replication-factor", str(replication_factor)
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info(f"Successfully created topic: {topic_name}")
return True
else:
self.logger.error(f"Failed to create topic: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Create topic exception: {e}")
return False
def delete_topic(self, topic_name):
"""Delete a Kafka topic"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.brokers,
"--delete",
"--topic", topic_name
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info(f"Successfully deleted topic: {topic_name}")
return True
else:
self.logger.error(f"Failed to delete topic: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Delete topic exception: {e}")
return False
def increase_partitions(self, topic_name, new_partition_count):
"""Increase partition count for a topic"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.brokers,
"--alter",
"--topic", topic_name,
"--partitions", str(new_partition_count)
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info(f"Successfully increased partitions for {topic_name} to {new_partition_count}")
return True
else:
self.logger.error(f"Failed to increase partitions: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Increase partitions exception: {e}")
return False
def rebalance_partitions(self, topic_name):
"""Rebalance partitions for a topic"""
try:
reassignment_file = f"/tmp/reassignment-{topic_name}.json"
cmd_current = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.brokers,
"--describe",
"--topic", topic_name
]
current_result = subprocess.run(cmd_current, capture_output=True, text=True)
if current_result.returncode == 0:
cmd_generate = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server", self.brokers,
"--topics-to-move-json-file", "/tmp/topics.json",
"--broker-list", "0,1,2,3",
"--generate"
]
cmd_execute = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server", self.brokers,
"--reassignment-json-file", reassignment_file,
"--execute"
]
self.logger.info(f"Starting rebalance for topic: {topic_name}")
return True
else:
self.logger.error(f"Failed to get topic info: {current_result.stderr}")
return False
except Exception as e:
self.logger.error(f"Rebalance exception: {e}")
return False
def backup_consumer_offsets(self, group_id):
"""Backup consumer group offsets"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server", self.brokers,
"--describe",
"--group", group_id
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
backup_file = f"/backup/consumer_offsets_{group_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(backup_file, 'w') as f:
f.write(result.stdout)
self.logger.info(f"Successfully backed up offsets for group {group_id} to {backup_file}")
return True
else:
self.logger.error(f"Backup offsets failed: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Backup offsets exception: {e}")
return False
if __name__ == "__main__":
kafka_ops = KafkaOperations(kafka_home="/opt/kafka", brokers="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092")
kafka_ops.create_topic("test_topic", partitions=6, replication_factor=3)
kafka_ops.increase_partitions("test_topic", 12)
kafka_ops.backup_consumer_offsets("test_consumer_group")4. High Availability and Fault Recovery
4.1 Cluster Health Check
#!/bin/bash
# Kafka cluster health check and auto-recovery
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
check_and_fix_isr() {
echo "Checking ISR issues..."
topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --list)
for topic in $topics; do
topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --describe --topic $topic)
isr_issues=$(echo "$topic_desc" | grep -E "Isr:|Replicas:" | awk '{
if ($1 == "Replicas:") replicas = NF-1;
if ($1 == "Isr:") isr = NF-1;
if (isr < replicas) print "ISR不足";
}')
if [ -n "$isr_issues" ]; then
echo "Topic $topic has ISR shortage, attempting fix..."
${KAFKA_HOME}/bin/kafka-leader-election.sh --bootstrap-server $KAFKA_BROKERS --election-type preferred --topic $topic
fi
done
}
auto_recovery() {
echo "Executing automatic fault recovery..."
for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do
if ! ssh $broker "systemctl is-active kafka" > /dev/null 2>&1; then
echo "Restarting broker: $broker"
ssh $broker "systemctl restart kafka"
sleep 30
fi
done
check_and_fix_isr
validate_cluster_state
}
validate_cluster_state() {
echo "Validating cluster state..."
online_brokers=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server $KAFKA_BROKERS 2>/dev/null | grep -c "id:")
if [ "$online_brokers" -eq 3 ]; then
echo "Cluster is healthy, all brokers online"
else
echo "Cluster recovery failed, online brokers: $online_brokers"
return 1
fi
}
auto_recoveryConclusion
Deploying Kafka in a production environment involves multiple critical aspects: cluster architecture design, performance parameter tuning, monitoring system construction, and automated operational procedures. By following the solutions presented, operations engineers can build a stable, high‑efficiency Kafka cluster, tailoring optimizations to specific business scenarios and continuously monitoring and improving system performance to ensure reliable message‑queue services.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
