Big Data 20 min read

Mastering Kafka: Concepts, Installation, Optimization, and Security

This comprehensive guide covers Kafka's core concepts, design principles, installation steps, configuration tweaks, performance optimizations, permission management, common operational commands, cluster scaling, log retention settings, and monitoring scripts to help you build and maintain a robust Kafka ecosystem.

dbaplus Community
dbaplus Community
dbaplus Community
Mastering Kafka: Concepts, Installation, Optimization, and Security

Kafka Overview

Kafka is a distributed publish‑subscribe messaging system originally developed by LinkedIn and now an Apache project. It provides a partitioned, replicated, and durable log service designed for high‑throughput streaming data, allowing seamless horizontal scaling without downtime.

Design and Persistence

Traditional disks write slowly with random I/O, but Kafka leverages linear writes to achieve high performance. Data is first written to the OS page cache (memory) and later flushed to disk based on configurable thresholds (e.g., after N messages or M seconds), balancing durability and read speed.

Installation and Configuration

After extracting Kafka, edit /opt/beh/core/kafka/config/server.properties to set essential parameters such as broker.id, host.name, and log.dirs. Example:

#broker.id=2
listeners=SASL_PLAINTEXT://:9092
log.dirs=/var/kafka/logs

Allocate JVM memory in kafka-server-start.sh (e.g., -Xmx10g -Xms5g) and start the cluster with:

nohup kafka-server-start.sh /opt/beh/core/kafka/config/server.properties &

Verify all brokers are up via ZooKeeper paths.

Performance Optimization Settings

Key tuning parameters include:

num.replica.fetchers=4
log.flush.interval.messages=10000
log.flush.interval.ms=1000
replica.fetch.min.bytes=1
replica.fetch.max.bytes=20971520
socket.receive.buffer.bytes=131072
num.network.threads=7
num.io.threads=13

These settings control replica fetching, flush behavior, socket buffers, and thread pools to improve throughput and latency.

Permission Control (ACLs)

Enable SASL/PLAIN authentication by configuring server.properties:

listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

Define users in JAAS files (e.g., kafka_server_jaas.conf) and client JAAS files for producers and consumers. Example ACL commands:

# Grant admin cluster actions
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --allow-principal User:admin --operation ClusterAction --cluster --add

# Grant producer write permission on a topic
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --allow-principal User:producer1 --producer --topic=topic-test1 --add

# Grant consumer read permission for all groups on a topic
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --allow-principal User:hadoop --consumer --topic=topic-test1 --group=* --add

Use kafka-acls.sh --list to view permissions.

Common Operations

Create, list, describe, and delete topics:

# Create a topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic logstash-yarnnodelog \
  --replication-factor 3 --partitions 9 --zookeeper zkip:2181/kafka

# List topics
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper zkip:2181

# Describe a topic
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper zkip:2181 --topic test20160807

# Delete a topic
$KAFKA_HOME/bin/kafka-topics.sh --delete --zookeeper zkip:2181 --topic topic-test

Start producer and consumer consoles:

# Producer
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list brokerip:9092 \
  --topic topic-test --producer.config=/opt/beh/core/kafka/config/producer.properties

# Consumer
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server brokerip:9092 \
  --from-beginning --topic topic-test --consumer.config=/opt/beh/core/kafka/config/consumer.properties

Cluster Expansion and Rebalancing

Use the partition reassignment tool to move partitions to new brokers. Example JSON for moving topics foo1 and foo2 to brokers 5 and 6:

{"topics":[{"topic":"foo1"},{"topic":"foo2"}],"version":1}

Generate a reassignment plan:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
  --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate

Execute the plan:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
  --reassignment-json-file expand-cluster-reassignment.json --execute

Verify with --verify.

Log Retention Settings

Control data lifespan with log.retention.bytes and log.retention.minutes. Either threshold triggers deletion:

log.retention.bytes=1073741824   # 1 GB per partition
log.retention.minutes=1440      # 1 day

Monitoring Scripts

Python script to check alive Kafka brokers via ZooKeeper:

#!/usr/bin/python
#_*_coding:utf-8_*_
import zookeeper, sys
zk = zookeeper.init("zkip1:2181")
ids = zookeeper.get_children(zk, "/brokers/ids")
if len(ids) == 16:
    print "ok  cb实时kafka1节点存活正常"
    sys.exit(0)
else:
    print "Critical  cb实时kafka1节点有:", 16 - len(ids), "个死去节点"
    sys.exit(2)

Python script to monitor disk usage on Kafka nodes via SSH (Paramiko):

#!/usr/bin/python
#_*_coding:utf-8_*_
import paramiko, sys
hosts = ['IP1', 'IP2']
user = sys.argv[1]
pwd = sys.argv[2]
threshold = sys.argv[3]
error = ""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for host in hosts:
    ssh.connect(host, 22, user, pwd)
    stdin, stdout, stderr = ssh.exec_command(
        "df -TPh|awk '$6>%s {print $7}'" % threshold)
    paths = stdout.readlines()
    if paths:
        error += "节点" + host + ":" + ",".join([p.strip() for p in paths]) + "。"
ssh.close()
if error:
    print "cb_rt_kafka业务数据采集集群,%s,磁盘存储超出百分之%s" % (error, threshold)
    sys.exit(2)
else:
    print "cb_rt_kafka业务数据采集集群正常"
    sys.exit(0)

Typical Troubleshooting

Replace failed disks and restart affected nodes.

Set unclean.leader.election.enable=false to avoid duplicate consumption.

Increase JVM heap or adjust file descriptor limits to prevent out‑of‑memory or “too many open files” errors.

Monitor replica sync status; adjust replica.lag.time.max.ms if followers fall behind.

By following these guidelines—understanding Kafka’s architecture, applying proper configuration, securing access, and employing monitoring—you can deploy a stable, high‑performance Kafka cluster suitable for large‑scale data pipelines.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataConfigurationKafkaSecurityInstallationscaling
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.