Exploring Advanced Features of Spring‑Kafka: Integration, Embedded Server, Topic Management, Transactions, and Message Handling

This article provides a comprehensive guide to using Spring‑Kafka, covering simple integration, embedded Kafka for testing, creating topics programmatically, advanced KafkaTemplate usage, transactional messaging, ReplyingKafkaTemplate for request‑reply, listener configurations, manual acknowledgments, lifecycle control, message forwarding, and retry with dead‑letter queues.

Java Captain
Java Captain
Java Captain
Exploring Advanced Features of Spring‑Kafka: Integration, Embedded Server, Topic Management, Transactions, and Message Handling

Kafka is a high‑performance message‑queue system based on topic partitions, and Spring‑Kafka wraps the Apache Kafka client to simplify integration in Spring applications.

Simple Integration

Add the Maven dependency:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

Configure the producer bootstrap server: spring.kafka.producer.bootstrap-servers=127.0.0.1:9092 Create a Spring Boot application that injects KafkaTemplate and uses @KafkaListener to receive messages. After starting the app, accessing http://localhost:8080/send/kl will log the received value.

Spring‑Kafka‑Test Embedded Kafka Server

For unit tests you can start an embedded Kafka broker with a single annotation: @EmbeddedKafka The annotation supports parameters such as count (number of brokers), controlledShutdown, ports, brokerProperties, and brokerPropertiesLocation to fine‑tune the embedded cluster.

Creating New Topics

When a message is sent to a non‑existent topic, Kafka creates it with default partitions and replicas defined by broker properties ( num.partitions=1, num.replica.fetchers=1). You can also create topics programmatically using KafkaAdmin beans or the low‑level AdminClient:

@Bean
public KafkaAdmin admin(KafkaProperties properties) {
    KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
    admin.setFatalIfBrokerNotAvailable(true);
    return admin;
}

@Bean
public NewTopic topic2() {
    return new NewTopic("topic-kl", 1, (short) 1);
}

Alternatively, use the raw Kafka client:

AdminClient client = AdminClient.create(properties.buildAdminProperties());
Collection<NewTopic> newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic("topic-kl", 1, (short) 1));
client.createTopics(newTopics);
client.close();

For older versions you can use kafka_2.10 and AdminUtils.createTopic with a ZkClient.

KafkaTemplate Message Sending

Asynchronous send with callbacks:

template.send("", "").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
    @Override public void onFailure(Throwable t) { /* ... */ }
    @Override public void onSuccess(SendResult<Object, Object> r) { /* ... */ }
});

Synchronous send:

ListenableFuture<SendResult<Object, Object>> future = template.send("topic-kl", "kl");
SendResult<Object, Object> result = future.get();

Transactional Messaging

Enable transactions with: spring.kafka.producer.transaction-id-prefix=kafka_tx. Send messages inside a transaction:

template.executeInTransaction(t -> {
    t.send("topic_input", "kl");
    if ("error".equals(input)) {
        throw new RuntimeException("failed");
    }
    t.send("topic_input", "ckl");
    return true;
});

Alternatively, annotate the method with @Transactional and use the regular template.send calls.

ReplyingKafkaTemplate (Request‑Reply)

Define a ReplyingKafkaTemplate bean and a dedicated reply container. Send a record and wait for the reply:

ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
RequestReplyFuture<String, String, String> future = replyingTemplate.sendAndReceive(record);
ConsumerRecord<String, String> reply = future.get();
System.out.println("Return value: " + reply.value());

The listener uses @SendTo to specify the reply topic.

Kafka Listener Usage

Basic listener:

@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input) {
    logger.info("input value: {}", input);
    return "successful";
}

Advanced configuration can specify topic partitions, initial offsets, concurrency, and a custom error handler:

@KafkaListener(id = "webGroup",
    topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
                      @TopicPartition(topic = "topic2", partitions = "0",
                          partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))},
    concurrency = "6",
    errorHandler = "myErrorHandler")
public String listen(String input) { ... }

The error handler must implement KafkaListenerErrorHandler and be registered in the Spring context.

Manual Acknowledgment

Disable auto‑commit and set spring.kafka.listener.ack-mode=manual. Then inject Acknowledgment into the listener method and call ack.acknowledge() when processing succeeds.

Listener Lifecycle Control

Set autoStartup="false" on the listener and use KafkaListenerEndpointRegistry to start, pause, or resume the container at runtime via REST endpoints.

SendTo Message Forwarding

Use @SendTo("topic-ckl") to forward the processed payload to another topic for further processing.

Retry and Dead‑Letter Queue

Configure a SeekToCurrentErrorHandler with a DeadLetterPublishingRecoverer to retry a message up to three times and then publish it to a dead‑letter topic ( originalTopic.DLT).

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> consumerFactory,
        KafkaTemplate<Object, Object> template) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
    return factory;
}

Listeners that throw an exception will be retried; after the limit, the message lands in the .DLT topic where a separate listener can handle it.

Conclusion

The author shares practical experiences with Spring‑Kafka, highlighting useful features such as embedded brokers for testing, transactional messaging, request‑reply semantics, manual acknowledgment, lifecycle management, and dead‑letter handling, aiming to help developers avoid common pitfalls.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaKafkaMessage QueueTransactional MessagingSpring KafkaEmbedded Kafka
Java Captain
Written by

Java Captain

Focused on Java technologies: SSM, the Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading; occasionally covers DevOps tools like Jenkins, Nexus, Docker, ELK; shares practical tech insights and is dedicated to full‑stack Java development.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.