Unveiling Kafka’s Controller: Architecture, Election, and Monitoring Deep Dive
This article provides a comprehensive technical analysis of Kafka’s Controller component, covering its background, core responsibilities, data storage, election process, version‑specific improvements, monitoring techniques, and key source‑code excerpts to help engineers understand and manage Kafka clusters effectively.
Background
Controller is a core component of Apache Kafka that, with ZooKeeper, manages and coordinates the entire cluster. Only one broker can be the controller at any time; its failure can affect producers and consumers, so monitoring its state, election, and logs is essential.
What Is the Controller?
The controller’s primary role is to manage the cluster under ZooKeeper’s assistance. Any broker can become the controller, but at any moment there is exactly one active controller.
Data Stored by the Controller
All topic information, including partition leaders and ISR sets.
All broker information, such as live brokers and brokers that are shutting down.
Partitions involved in operational tasks, including those undergoing preferred‑leader election or reassignment.
This metadata is also persisted in ZooKeeper; when the controller starts it reads the data from ZooKeeper and caches it locally. Broker metadata updates are pushed by the controller rather than pulled from ZooKeeper.
Controller Responsibilities
The controller’s duties can be grouped into five main categories: topic management, partition reassignment, preferred‑leader election, cluster member management (broker join/leave), and providing data services to other brokers.
UpdateMetadataRequest : Broadcasts metadata changes (e.g., leader changes) to all alive brokers.
CreateTopics : Creates a topic by writing a znode under /brokers/topics in ZooKeeper, which triggers the creation logic.
DeleteTopics : Deletes a topic via a znode under /admin/delete_topics, with the controller executing the actual deletion.
Partition Reassignment : Performed by the kafka-reassign-partitions script, which writes to /admin/reassign_partitions and lets the controller apply the new assignment.
Preferred Leader Election : Can be triggered automatically ( auto.leader.rebalance.enable=true) or manually via the kafka-preferred-replica-election script, both writing to /admin/preferred_replica_election.
Cluster Expansion : When a new broker registers under /brokers/ids, the controller performs service discovery.
Broker Crash Handling : The controller watches ZooKeeper for broker session expirations and promptly elects new leaders for affected partitions.
Controlled Shutdown : Handles graceful broker exits via a ControlledShutdownRequest.
Controller Leader Election : Uses a monotonically increasing epoch number stored in /controller_epoch to resolve split‑brain scenarios.
How a Broker Becomes Controller and Avoids Split‑Brain
The broker that first creates the temporary /controller znode becomes the controller. Election relies on ZooKeeper’s conditional create and an ever‑increasing epoch number. When a broker crashes, ZooKeeper watches notify the controller, which then selects a new leader. Split‑brain is prevented by comparing epoch numbers; only the broker with the highest epoch is accepted as the current controller.
Version‑Specific Improvements
In versions prior to Kafka 2.2, controller requests shared the same request queue with client requests, causing possible delays and split‑brain. Kafka 2.2 separates controller traffic from data traffic (see SocketServer.scala), improving isolation. Kafka 0.11 introduced asynchronous ZooKeeper writes, boosting write performance by roughly ten times.
Monitoring the Controller
Only one controller should exist; monitoring its count and change history is crucial. Kafka’s JMX tool can be used for lightweight monitoring.
${KAFKA_PATH}/bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://"${BrokerIP}":"${JMXPort}"/jmxrmi --object-name kafka.controller:type=KafkaController,name=ActiveControllerCount --date-format "YYYY-MM-dd_HH:mm" --reporting-interval -1 | grep -v typeAdditional JMX metrics such as OfflinePartitionsCount, ZooKeeper_SessionState, and MessagesInPerSec can be queried similarly.
function inter_controller_history() {
# Detect first controller and record changes
if [ ! -f "${clusterID}_controller_history" ]; then
awk '/,1$/ {print $0}' "${clusterID}_controller" >> "${clusterID}_controller_history"
else
nowController=$(awk '/,1$/ {print $0}' "${clusterID}_controller" | awk -F ',' '{print $1}')
LastTimeController=$(tail -n 1 "${clusterID}_controller_history" | awk '/,1$/ {print $0}' | awk -F ',' '{print $1}')
if [ "${nowController}_X" != "${LastTimeController}_X" ]; then
awk '/,1$/ {print $0}' "${clusterID}_controller" >> "${clusterID}_controller_history"
msg="${msg_tmp} clusterID:${clusterID} ${ClusterNameCN} Controller From ${LastTimeController} to ${nowController}"
echo "$msg" >> $log_file_name
send_warning
fi
fi
}Controller Source Code Overview (Kafka 2.2)
Key classes and their responsibilities:
KafkaController : Main controller class, holds configuration, ZooKeeper client, metrics, and context.
ControllerEventManager : Manages controller events and the event‑processing thread.
TopicDeletionManager : Handles safe topic deletion.
ReplicaStateMachine and PartitionStateMachine : Maintain replica and partition states.
Various ZooKeeper handlers (broker, topic, partition, ISR, etc.) for reacting to cluster changes.
The startup flow registers ZooKeeper listeners, writes a Startup event, and starts the event manager thread.
def startup() = {
zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name = StateChangeHandlers.ControllerHandler
override def afterInitializingSession(): Unit = {
eventManager.put(RegisterBrokerAndReelect)
}
override def beforeInitializingSession(): Unit = {
val expireEvent = new Expire
eventManager.clearAndPut(expireEvent)
expireEvent.waitUntilProcessingStarted()
}
})
eventManager.put(Startup)
eventManager.start()
}Election logic attempts to create the temporary /controller node, increments the epoch, and on success becomes the active controller.
private def elect(): Unit = {
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (activeControllerId != -1) { debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process."); return }
try {
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
controllerContext.epoch = epoch
controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} and epoch zk version is now ${controllerContext.epochZkVersion}")
onControllerFailover()
} catch {
case e: ControllerMovedException =>
maybeResign()
if (activeControllerId != -1) debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
else warn("A controller has been elected but just resigned, this will result in another round of election", e)
case t: Throwable =>
error(s"Error while electing or becoming controller on broker ${config.brokerId}. Trigger controller movement immediately", t)
triggerControllerMove()
}
}After election, onControllerFailover registers all ZooKeeper watchers, initializes the controller context, sends an UpdateMetadataRequest, starts the replica and partition state machines, and schedules background tasks such as auto‑leader rebalance and token expiry checks.
Key Takeaways
The controller is the single point of coordination in a Kafka cluster. Understanding its election mechanism, responsibilities, and monitoring is crucial for reliable operation, especially in large deployments where controller performance can become a bottleneck.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Tencent Cloud Developer
Official Tencent Cloud community account that brings together developers, shares practical tech insights, and fosters an influential tech exchange community.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
