Understanding Kafka Transactions: TC Service, Producer Flow, and Code Walkthrough
This article explains how Kafka implements transactions, detailing the role of the Transaction Coordinator (TC) service, the transaction flow diagram, producer initialization, partition handling, offset commits, commit and abort processes, and includes a complete Java code example with client‑side and server‑side components.
Transaction Flow Overview
Kafka transactions are widely used in stream processing to achieve atomic reads, processing, and writes, with rollback support when errors occur.
Transaction Coordinator (TC) Service
The TC service runs on the Kafka broker and is referred to as the Transaction Coordinator. It uses the internal __transaction_state topic to persist transaction metadata. Each transaction is assigned a unique transactional.id, which determines the partition of the transaction topic that stores its state.
Finding the TC Service Address
When a producer starts a transaction, it first selects any broker in the cluster and sends a request to obtain the TC service address. The __transaction_state topic has multiple partitions (default 50); the partition leader for the hash of the transaction id hosts the TC service for that transaction.
Transaction Initialization
The producer must configure a unique transaction id. Before sending any transactional messages, it calls initTransactions(), which requests a producer id from the TC service. The TC service persists the producer id in the transaction topic.
Sending Transactional Messages
After receiving a producer id, the producer can send messages. Before sending, it uploads the target partitions to the TC service, which persists this information in the transaction topic. Messages are marked as transactional so that they are only visible after a successful commit.
Offset Commit Request
Kafka provides a special request sendOffsetsToTransaction that allows a producer to atomically commit consumer offsets together with the transactional messages. The producer sends the consumer group offsets to the TC service, which stores them in the transaction topic.
Commit Request Persistence
When the producer decides to commit, it sends a commit request to the TC service. The TC service first persists the commit information to the transaction topic, replies to the producer, and then generates commit requests for each involved partition. After all partitions acknowledge the commit, the TC service persists a final transaction‑complete record.
Abort and Timeout Handling
If a transaction times out (e.g., due to network issues), the TC service aborts it, updates the state, and persists an abort record. The TC service periodically scans ongoing transactions and aborts those exceeding the configured timeout.
Client‑Side API
public interface Producer<K, V> extends Closeable {
// Initialize transactions (requests producer id)
void initTransactions();
// Begin a new transaction
void beginTransaction() throws ProducerFencedException;
// Send consumer offsets to the transaction
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// Commit the transaction
void commitTransaction() throws ProducerFencedException;
// Abort the transaction
void abortTransaction() throws ProducerFencedException;
}Example usage:
KafkaProducer<String, String> producer = createKafkaProducer(
"bootstrap.servers", "localhost:9092",
"transactional.id", "my-transactional-id");
producer.initTransactions();
KafkaConsumer<String, String> consumer = createKafkaConsumer(
"bootstrap.servers", "localhost:9092",
"group.id", "my-group-id",
"isolation.level", "read_committed");
consumer.subscribe(singleton("inputTopic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
producer.send(producerRecord("outputTopic_1", record));
producer.send(producerRecord("outputTopic_2", record));
}
producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");
producer.commitTransaction();
}TransactionManager Internals
The TransactionManager tracks partitions added to a transaction, maintains sets of new, pending, and committed partitions, and builds request objects such as AddPartitionsToTxnRequest and EndTxnRequest. Handlers like AddPartitionsToTxnHandler and EndTxnHandler process responses and update the transaction state.
Server‑Side State Management
The TransactionStateManager holds a cache of TransactionMetadata objects, persists state changes to the __transaction_state topic, and loads state on broker restart. Methods such as appendTransactionToLog create records with the transaction key and value and write them to the log.
TransactionCoordinator Logic
The TransactionCoordinator handles core requests:
initProducerId : allocates or returns an existing producer id.
addPartitionsToTransaction : records the partitions involved in the transaction.
endTransaction : persists the commit/abort marker, sends transaction result messages to partitions via TransactionMarkerChannelManager, and finally writes a transaction‑complete record.
Result messages are sent by a background thread that pulls requests from a queue. A DelayedTxnMarker ensures the operation completes only after all partition acknowledgments are received.
High Availability
Multiple TC services run in the cluster; each is the leader for a subset of the transaction‑state partitions. If a TC leader fails, its follower takes over, guaranteeing continued transaction processing.
Timeout and Recovery
When a broker restarts, the TC service reloads transaction metadata from the __transaction_state topic. If it finds a transaction in the PrepareCommit state without a final result, it resumes sending the pending result messages to the partitions.
Overall, Kafka’s transaction mechanism relies on a combination of client‑side coordination, persistent metadata in a dedicated internal topic, and robust server‑side state machines to provide exactly‑once semantics for stream processing.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java Interview Crash Guide
Dedicated to sharing Java interview Q&A; follow and reply "java" to receive a free premium Java interview guide.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
