Big Data 19 min read

Kafka Exactly-Once Semantics and Transaction API Overview

This article explains Kafka's exactly‑once semantics and transaction support, detailing the new producer API methods, related exceptions, configuration parameters, and a sample application illustrating how to initialize, begin, process, and commit or abort transactions while ensuring idempotent and atomic message handling.

Architect
Architect
Architect
Kafka Exactly-Once Semantics and Transaction API Overview

Introduction

Kafka provides exactly‑once semantics and transactional messaging. This article consolidates the related API, data flow, and configuration details.

Producer Interface

API Changes

The producer now includes five new methods for transaction handling and a new exception for the send interface.

public interface Producer<K,V> extends Closeable {
    // Initialize transactions; must be called before any other transaction method.
    void initTransactions() throws IllegalStateException;

    // Begin a new transaction.
    void beginTransaction() throws ProducerFencedException;

    // Send consumed offsets to the transaction.
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                 String consumerGroupId) throws ProducerFencedException;

    // Commit the ongoing transaction.
    void commitTransaction() throws ProducerFencedException;

    // Abort the ongoing transaction.
    void abortTransaction() throws ProducerFencedException;

    // Asynchronous send returning a future.
    public Future<RecordMetadata> send(ProducerRecord<K,V> record);
    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
}

OutOfOrderSequenceException

If the broker detects missing data, it throws OutOfOrderSequenceException, which propagates through the Future and any provided callback. Subsequent producer calls such as send, beginTransaction, or commitTransaction will then raise an IllegalStateException.

Application Example

A minimal Java program demonstrates the transaction workflow:

public class KafkaTransactionsExample {
    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
        // Transactional producer requires a transactional.id.
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
        producer.initTransactions();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
            if (!records.isEmpty()) {
                producer.beginTransaction();
                List<ProducerRecord<String, String>> outputRecords = processRecords(records);
                for (ProducerRecord<String, String> outputRecord : outputRecords) {
                    producer.send(outputRecord);
                }
                // Send consumed offsets as part of the transaction.
                producer.sendOffsetsToTransaction(getUncommittedOffsets(), "my-group");
                // Commit the transaction (or abort on error).
                producer.commitTransaction();
            }
        }
    }
}

New Configuration Options

Broker Settings

transactional.id.timeout.ms – Time after which an idle transactional.id is considered expired (default 7 days).

max.transaction.timeout.ms – Upper bound for transaction timeout (default 15 minutes).

transaction.state.log.replication.factor – Replication factor for the transaction log (default 3).

transaction.state.log.num.partitions – Number of partitions for the transaction log (default 50).

transaction.state.log.min.isr – Minimum in‑sync replicas for the transaction log (default 2).

transaction.state.log.segment.bytes – Segment size for the transaction log (default 104 857 600 bytes).

Producer Settings

enable.idempotence – Enables idempotent writes (must be true for transactions).

transaction.timeout.ms – Transaction timeout; must be ≤ broker's max.transaction.timeout.ms (default 60 seconds).

transactional.id – Identifier that ties multiple producer instances to the same transaction state; requires idempotence.

Consumer Settings

isolation.level – Controls visibility of transactional messages (read_uncommitted or read_committed).

Semantic Guarantees

Producer Idempotence

Each producer instance receives a unique Producer ID (PID) and a monotonically increasing sequence number per partition. The broker validates the sequence, rejecting out‑of‑order records and ensuring each message appears exactly once even after retries.

Transaction Guarantees

Transactions allow atomic writes to multiple partitions; either all writes succeed or all are rolled back. When combined with offset commits, the consume‑transform‑produce pipeline becomes atomic, and a TransactionalId guarantees continuity across producer restarts.

Core Concepts

Transaction Coordinator – Manages PIDs, epochs, and transaction state.

Transaction Log – Internal replicated topic storing transaction metadata.

Control Messages – Special broker‑generated messages that indicate commit or abort status to consumers.

TransactionalId – User‑provided identifier that maps to a PID and enables cross‑session transaction recovery.

Producer Epoch – Ensures only one active producer instance per TransactionalId.

Data Flow

The accompanying diagram (not shown) illustrates the interaction between producers, brokers, transaction coordinator, and consumer groups, with numbered RPCs representing each step.

Transaction Lifecycle

FindCoordinatorRequest – Locate the Transaction Coordinator.

InitPidRequest – Obtain a PID and epoch; recover any unfinished transactions.

beginTransaction – Start a new transaction.

Consume‑Transform‑Produce Loop – Produce records, optionally add partitions (AddPartitionsToTxnRequest), send offsets (AddOffsetCommitsToTxnRequest), and commit offsets (TxnOffsetCommitRequest).

Commit or Abort – EndTxnRequest triggers PREPARE_COMMIT/ABORT, WriteTxnMarkerRequest writes control messages, and the coordinator finalizes the transaction state.

After the final commit/abort marker, the transaction log can discard most intermediate data, retaining only the PID and timestamps for cleanup.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ConfigurationKafkaIdempotenceExactly-OnceProducer API
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.