Big Data 22 min read

How Kafka Implements Transactions: Inside the TC Service and Producer Workflow

This article provides a comprehensive walkthrough of Kafka's transaction mechanism, covering the transaction coordinator, producer initialization, partition handling, commit and abort processes, state management, high‑availability design, timeout handling, and relevant source code snippets.

21CTO
21CTO
21CTO
How Kafka Implements Transactions: Inside the TC Service and Producer Workflow

Kafka Transaction Implementation Principles

Kafka transactions are extensively used in stream processing to achieve atomic reads, processing, and writes, with rollback support when errors occur. This article explains how Kafka implements transactions, beginning with the overall transaction flow diagram.

Transaction Flow

The transaction flow involves a special transaction topic __transaction_state that persists transaction metadata. Each partition of this topic is responsible for a subset of transactions, determined by hashing the transaction ID.

Finding TC Service Address

The producer first selects any broker in the Kafka cluster and sends a request to discover the Transaction Coordinator (TC) service address. The transaction topic __transaction_state has multiple partitions (default 50); the leader of the partition that owns the transaction ID provides the TC service address.

Transaction Initialization

To use transactions, a producer must define a unique transaction ID. This ID allows the client to resume incomplete transactions after a restart. Transaction support relies on idempotence, which requires a producer ID. Before starting a transaction, the producer requests a producer ID from the TC service, which is then persisted to the transaction topic.

Sending Messages

After receiving the producer ID, the producer can send messages. Prior to sending, the producer uploads the partitions of the messages to the TC service, which persists this information to the transaction topic. Transactional messages contain a flag indicating they belong to a transaction.

The producer also sends a special request to commit consumer offsets atomically with the transaction. It first sends a request containing the consumer group’s partition offsets, which the TC service persists before forwarding the offset commit request to the GroupCoordinator.

Commit Request

When the producer decides the transaction can be committed, it sends a commit request to the TC service and waits for a response. The producer ensures all prior requests have been successfully sent and acknowledged before sending the commit request.

Commit Request Persistence

The TC service first persists the commit information to the transaction topic, then immediately responds to the producer. It generates commit requests for each involved partition and queues them for sending.

Sending Transaction Result to Partitions

A background thread continuously pulls requests from the queue and sends them to the partitions. Each partition stores the result and acknowledges the TC service. Once all partitions acknowledge, the TC service persists a final transaction‑complete message to the transaction topic, completing the transaction.

Client Side Mechanics

Example Code

KafkaProducer producer = createKafkaProducer(
    "bootstrap.servers", "localhost:9092",
    "transactional.id", "my-transactional-id");
producer.initTransactions();

KafkaConsumer consumer = createKafkaConsumer(
    "bootstrap.servers", "localhost:9092",
    "group.id", "my-group-id",
    "isolation.level", "read_committed");
consumer.subscribe(singleton("inputTopic"));

while (true) {
    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
    producer.beginTransaction();
    for (ConsumerRecord record : records) {
        producer.send(producerRecord("outputTopic_1", record));
        producer.send(producerRecord("outputTopic_2", record));
    }
    producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");
    producer.commitTransaction();
}

The Producer interface defines methods such as initTransactions(), beginTransaction(), sendOffsetsToTransaction(), commitTransaction(), and abortTransaction(). The KafkaProducer class implements this interface and delegates most work to TransactionCoordinator.

TransactionManager

The TransactionManager tracks partitions added to a transaction. It maintains three sets: newPartitionsInTransaction, partitionsInTransaction, and pendingPartitionsInTransaction. Methods such as maybeAddPartitionToTransaction() ensure partitions are uploaded before sending messages.

public synchronized void maybeAddPartitionToTransaction(TopicPartition tp) {
    failIfNotReadyForSend();
    if (isPartitionAdded(tp) || isPartitionPendingAdd(tp)) return;
    log.debug("Begin adding new partition {} to transaction", tp);
    newPartitionsInTransaction.add(tp);
}

When the producer calls beginCommit() or beginAbort(), the TransactionManager changes its internal state and creates an EndTxnRequest that is sent to the TC service.

Server Side Mechanics

The server side consists of several components. TransactionStateManager maintains transaction metadata in memory and persists changes to the transaction topic. It provides methods like appendTransactionToLog() to write metadata records.

def appendTransactionToLog(
    transactionalId: String,
    coordinatorEpoch: Int,
    newMetadata: TxnTransitMetadata,
    responseCallback: Errors => Unit): Unit = {
    val keyBytes = TransactionLog.keyToBytes(transactionalId)
    val valueBytes = TransactionLog.valueToBytes(newMetadata)
    val records = MemoryRecords.withRecords(
        TransactionLog.EnforcedCompressionType,
        new SimpleRecord(timestamp, keyBytes, valueBytes))
    val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
    replicaManager.appendRecords(
        newMetadata.txnTimeoutMs.toLong,
        TransactionLog.EnforcedRequiredAcks,
        internalTopicsAllowed = true,
        isFromClient = false,
        Map(topicPartition -> records),
        responseCallback,
        delayedProduceLock = Some(stateLock.readLock))
}
TransactionCoordinator

handles core requests: handleInitProducerId allocates or returns an existing producer ID; handleAddPartitionsToTransaction persists the list of partitions; handleEndTransaction updates the transaction state, persists the change, and triggers the TransactionMarkerChannelManager to send markers to the involved partitions.

Transaction Marker Flow

The TransactionMarkerChannelManager creates a DelayedTxnMarker for each transaction. The delayed operation waits until all partition responses are received; its timeout is set to 365 days, effectively never expiring. Once all responses arrive, the marker is considered complete and the transaction is finalized.

private class DelayedTxnMarker(
    txnMetadata: TransactionMetadata,
    completionCallback: Errors => Unit,
    lock: Lock) extends DelayedOperation(
        TimeUnit.DAYS.toMillis(365 * 100), Some(lock)) {
    override def tryComplete(): Boolean = {
        txnMetadata.inLock {
            if (txnMetadata.topicPartitions.isEmpty) {
                forceComplete()
            } else false
        }
    }
    override def onExpiration(): Unit = {
        throw new IllegalStateException(
            s"Delayed write txn marker operation for metadata $txnMetadata has timed out, this should never happen.")
    }
    override def onComplete(): Unit = completionCallback(Errors.NONE)
}

High Availability and Timeout Handling

Multiple TC services run in the cluster, each responsible for a partition of the transaction topic. If a TC leader fails, a follower takes over, ensuring high availability. The TC service periodically checks ongoing transactions; if a transaction exceeds its configured timeout, the service aborts it, persists the abort state, and notifies the involved partitions.

Summary

Kafka’s transaction implementation relies on a dedicated transaction coordinator service, a persistent transaction topic, and a state machine that tracks transaction lifecycles. Producers interact with the TC service to obtain producer IDs, add partitions, and commit or abort transactions. The server side persists state changes, manages high availability through partition leaders, and uses delayed operations to ensure all partition acknowledgments are received before finalizing a transaction.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Distributed SystemsJavaKafkaTransactionsProducerTransaction Coordinator
21CTO
Written by

21CTO

21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.