Backend Development 10 min read

Using Apache Kafka with Spring Boot: Error Handling, Deserialization, Multi‑Listener, and Transactions

This article explains how to integrate Apache Kafka with Spring Boot, covering error recovery with SeekToCurrentErrorHandler, deserialization handling, domain‑object conversion, multiple listener routing, and transaction support, while providing complete code examples for each feature.

Architects Research Society
Architects Research Society
Architects Research Society
Using Apache Kafka with Spring Boot: Error Handling, Deserialization, Multi‑Listener, and Transactions

Spring for Apache Kafka brings the familiar Spring programming model to Kafka, offering a KafkaTemplate for publishing and listener containers for POJO listeners, with auto‑configuration that lets developers focus on business logic.

Error Recovery – By default failed records are logged, but you can configure a custom error handler. Overriding the auto‑configured container factory lets you set SeekToCurrentErrorHandler (or a variant with a DeadLetterPublishingRecoverer ) to retry, skip, or forward failed messages to a dead‑letter topic.

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory
kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
}

When combined with a dead‑letter publisher:

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory
kafkaConsumerFactory,
        KafkaTemplate
template) {
    ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(
            new DeadLetterPublishingRecoverer(template), 3));
    return factory;
}

Deserialization Errors – If a deserialization exception occurs before the record reaches the listener, ErrorHandlingDeserializer wraps the delegate deserializer, captures the exception, and forwards the raw data to the error handler for diagnostics.

Domain Objects and Type Inference – By defining a RecordMessageConverter (e.g., StringJsonMessageConverter ), Spring can infer the target type from the listener method signature, automatically converting JSON payloads to POJOs such as Foo2 . The same converter works for producers sending objects of compatible types.

@Bean
public RecordMessageConverter converter() {
    return new StringJsonMessageConverter();
}

@KafkaListener(id = "fooGroup", topics = "topic1")
public void listen(Foo2 foo) {
    logger.info("Received: " + foo);
    if (foo.getFoo().startsWith("fail")) {
        throw new RuntimeException("failed");
    }
}

Multiple Listeners – A single container can route records to different methods based on type information carried in record headers. This requires configuring the converter to trust specific packages and to use TYPE_ID header precedence.

@Bean
public RecordMessageConverter converter() {
    StringJsonMessageConverter converter = new StringJsonMessageConverter();
    DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
    typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
    typeMapper.addTrustedPackages("com.common");
    Map
> mappings = new HashMap<>();
    mappings.put("foo", Foo2.class);
    mappings.put("bar", Bar2.class);
    typeMapper.setIdClassMapping(mappings);
    converter.setTypeMapper(typeMapper);
    return converter;
}

@Component
@KafkaListener(id = "multiGroup", topics = {"foos", "bars"})
public class MultiMethods {
    @KafkaHandler
    public void foo(Foo1 foo) { System.out.println("Received: " + foo); }
    @KafkaHandler
    public void bar(Bar bar) { System.out.println("Received: " + bar); }
    @KafkaHandler(isDefault = true)
    public void unknown(Object object) { System.out.println("Received unknown: " + object); }
}

Transactions – Enabling transactions by setting transaction-id-prefix and configuring the consumer’s isolation.level to read_committed ensures that all KafkaTemplate operations inside a @KafkaListener participate in the same transaction. Offsets are committed only after the transaction succeeds.

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      transaction-id-prefix: tx.
    consumer:
      properties:
        isolation.level: read_committed
@KafkaListener(id = "fooGroup2", topics = "topic2")
public void listen(List
foos) throws IOException {
    logger.info("Received: " + foos);
    foos.forEach(f -> kafkaTemplate.send("topic3", f.getFoo().toUpperCase()));
    logger.info("Messages sent, hit enter to commit tx");
    System.in.read();
}

The article concludes that using Spring with Apache Kafka eliminates much boilerplate and adds powerful features such as error handling, retries, and record filtering, while only scratching the surface of what the integration can achieve.

Spring Booterror handlingTransactionsApache KafkaMessage Conversion
Architects Research Society
Written by

Architects Research Society

A daily treasure trove for architects, expanding your view and depth. We share enterprise, business, application, data, technology, and security architecture, discuss frameworks, planning, governance, standards, and implementation, and explore emerging styles such as microservices, event‑driven, micro‑frontend, big data, data warehousing, IoT, and AI architecture.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.