How Kafka Implements Transactions: Inside the TC Service and Producer Workflow
This article provides a comprehensive walkthrough of Kafka's transaction mechanism, covering the transaction coordinator, producer initialization, partition handling, commit and abort processes, state management, high‑availability design, timeout handling, and relevant source code snippets.
Kafka Transaction Implementation Principles
Kafka transactions are extensively used in stream processing to achieve atomic reads, processing, and writes, with rollback support when errors occur. This article explains how Kafka implements transactions, beginning with the overall transaction flow diagram.
Transaction Flow
The transaction flow involves a special transaction topic __transaction_state that persists transaction metadata. Each partition of this topic is responsible for a subset of transactions, determined by hashing the transaction ID.
Finding TC Service Address
The producer first selects any broker in the Kafka cluster and sends a request to discover the Transaction Coordinator (TC) service address. The transaction topic __transaction_state has multiple partitions (default 50); the leader of the partition that owns the transaction ID provides the TC service address.
Transaction Initialization
To use transactions, a producer must define a unique transaction ID. This ID allows the client to resume incomplete transactions after a restart. Transaction support relies on idempotence, which requires a producer ID. Before starting a transaction, the producer requests a producer ID from the TC service, which is then persisted to the transaction topic.
Sending Messages
After receiving the producer ID, the producer can send messages. Prior to sending, the producer uploads the partitions of the messages to the TC service, which persists this information to the transaction topic. Transactional messages contain a flag indicating they belong to a transaction.
The producer also sends a special request to commit consumer offsets atomically with the transaction. It first sends a request containing the consumer group’s partition offsets, which the TC service persists before forwarding the offset commit request to the GroupCoordinator.
Commit Request
When the producer decides the transaction can be committed, it sends a commit request to the TC service and waits for a response. The producer ensures all prior requests have been successfully sent and acknowledged before sending the commit request.
Commit Request Persistence
The TC service first persists the commit information to the transaction topic, then immediately responds to the producer. It generates commit requests for each involved partition and queues them for sending.
Sending Transaction Result to Partitions
A background thread continuously pulls requests from the queue and sends them to the partitions. Each partition stores the result and acknowledges the TC service. Once all partitions acknowledge, the TC service persists a final transaction‑complete message to the transaction topic, completing the transaction.
Client Side Mechanics
Example Code
KafkaProducer producer = createKafkaProducer(
"bootstrap.servers", "localhost:9092",
"transactional.id", "my-transactional-id");
producer.initTransactions();
KafkaConsumer consumer = createKafkaConsumer(
"bootstrap.servers", "localhost:9092",
"group.id", "my-group-id",
"isolation.level", "read_committed");
consumer.subscribe(singleton("inputTopic"));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records) {
producer.send(producerRecord("outputTopic_1", record));
producer.send(producerRecord("outputTopic_2", record));
}
producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");
producer.commitTransaction();
}The Producer interface defines methods such as initTransactions(), beginTransaction(), sendOffsetsToTransaction(), commitTransaction(), and abortTransaction(). The KafkaProducer class implements this interface and delegates most work to TransactionCoordinator.
TransactionManager
The TransactionManager tracks partitions added to a transaction. It maintains three sets: newPartitionsInTransaction, partitionsInTransaction, and pendingPartitionsInTransaction. Methods such as maybeAddPartitionToTransaction() ensure partitions are uploaded before sending messages.
public synchronized void maybeAddPartitionToTransaction(TopicPartition tp) {
failIfNotReadyForSend();
if (isPartitionAdded(tp) || isPartitionPendingAdd(tp)) return;
log.debug("Begin adding new partition {} to transaction", tp);
newPartitionsInTransaction.add(tp);
}When the producer calls beginCommit() or beginAbort(), the TransactionManager changes its internal state and creates an EndTxnRequest that is sent to the TC service.
Server Side Mechanics
The server side consists of several components. TransactionStateManager maintains transaction metadata in memory and persists changes to the transaction topic. It provides methods like appendTransactionToLog() to write metadata records.
def appendTransactionToLog(
transactionalId: String,
coordinatorEpoch: Int,
newMetadata: TxnTransitMetadata,
responseCallback: Errors => Unit): Unit = {
val keyBytes = TransactionLog.keyToBytes(transactionalId)
val valueBytes = TransactionLog.valueToBytes(newMetadata)
val records = MemoryRecords.withRecords(
TransactionLog.EnforcedCompressionType,
new SimpleRecord(timestamp, keyBytes, valueBytes))
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
replicaManager.appendRecords(
newMetadata.txnTimeoutMs.toLong,
TransactionLog.EnforcedRequiredAcks,
internalTopicsAllowed = true,
isFromClient = false,
Map(topicPartition -> records),
responseCallback,
delayedProduceLock = Some(stateLock.readLock))
} TransactionCoordinatorhandles core requests: handleInitProducerId allocates or returns an existing producer ID; handleAddPartitionsToTransaction persists the list of partitions; handleEndTransaction updates the transaction state, persists the change, and triggers the TransactionMarkerChannelManager to send markers to the involved partitions.
Transaction Marker Flow
The TransactionMarkerChannelManager creates a DelayedTxnMarker for each transaction. The delayed operation waits until all partition responses are received; its timeout is set to 365 days, effectively never expiring. Once all responses arrive, the marker is considered complete and the transaction is finalized.
private class DelayedTxnMarker(
txnMetadata: TransactionMetadata,
completionCallback: Errors => Unit,
lock: Lock) extends DelayedOperation(
TimeUnit.DAYS.toMillis(365 * 100), Some(lock)) {
override def tryComplete(): Boolean = {
txnMetadata.inLock {
if (txnMetadata.topicPartitions.isEmpty) {
forceComplete()
} else false
}
}
override def onExpiration(): Unit = {
throw new IllegalStateException(
s"Delayed write txn marker operation for metadata $txnMetadata has timed out, this should never happen.")
}
override def onComplete(): Unit = completionCallback(Errors.NONE)
}High Availability and Timeout Handling
Multiple TC services run in the cluster, each responsible for a partition of the transaction topic. If a TC leader fails, a follower takes over, ensuring high availability. The TC service periodically checks ongoing transactions; if a transaction exceeds its configured timeout, the service aborts it, persists the abort state, and notifies the involved partitions.
Summary
Kafka’s transaction implementation relies on a dedicated transaction coordinator service, a persistent transaction topic, and a state machine that tracks transaction lifecycles. Producers interact with the TC service to obtain producer IDs, add partitions, and commit or abort transactions. The server side persists state changes, manages high availability through partition leaders, and uses delayed operations to ensure all partition acknowledgments are received before finalizing a transaction.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
21CTO
21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
