Deep Dive into Using Apache Kafka with Spring Boot: Error Handling, Message Conversion, and Transaction Support
This article explains how to integrate Apache Kafka with Spring Boot, covering error handling with SeekToCurrentErrorHandler, deserialization error handling, type‑inferred message conversion, multi‑listener routing, and transactional message processing, providing code examples and configuration details for each feature.
The article demonstrates how to use Spring Boot with Apache Kafka, starting from a basic listener example and then extending it with advanced features such as error handling, message conversion, and transaction support.
Error Handling – By default failed records are logged, but a custom @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(new SeekToCurrentErrorHandler()); return factory; } overrides the auto‑configuration to use SeekToCurrentErrorHandler , which can discard the offending record, retry, or forward it to a dead‑letter topic.
When combined with a DeadLetterPublishingRecoverer , the handler can publish failed messages to a separate topic for later analysis:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), 3));
return factory;
}
@KafkaListener(id = "fooGroup", topics = "topic1")
public void listen(String in) {
logger.info("Received: " + in);
if (in.startsWith("foo")) {
throw new RuntimeException("failed");
}
}
@KafkaListener(id = "dltGroup", topics = "topic1.DLT")
public void dltListen(String in) {
logger.info("Received from DLT: " + in);
}Deserialization Errors – The ErrorHandlingDeserializer wraps the delegate deserializer, captures any exception, and forwards the raw data to the error handler, allowing diagnostics of problematic payloads.
Domain Object Type Inference – By defining a RecordMessageConverter bean (e.g., new StringJsonMessageConverter() ), Spring can automatically convert JSON payloads to the method‑parameter type, such as Foo2 . The same converter can be used on the producer side with JsonSerializer .
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@KafkaListener(id = "fooGroup", topics = "topic1")
public void listen(Foo2 foo) {
logger.info("Received: " + foo);
if (foo.getFoo().startsWith("fail")) {
throw new RuntimeException("failed");
}
}
@KafkaListener(id = "dltGroup", topics = "topic1.DLT")
public void dltListen(Foo2 in) {
logger.info("Received from DLT: " + in);
}Multiple Listeners and Type Mapping – When routing based on record headers, a DefaultJackson2JavaTypeMapper with trusted packages and explicit ID‑class mappings is required.
@Bean
public RecordMessageConverter converter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.common");
Map
> mappings = new HashMap<>();
mappings.put("foo", Foo2.class);
mappings.put("bar", Bar2.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}Listeners can then use @KafkaHandler methods to process different payload types, with an optional default handler for unknown types.
@Component
@KafkaListener(id = "multiGroup", topics = { "foos", "bars" })
public class MultiMethods {
@KafkaHandler
public void foo(Foo1 foo) { System.out.println("Received: " + foo); }
@KafkaHandler
public void bar(Bar bar) { System.out.println("Received: " + bar); }
@KafkaHandler(isDefault = true)
public void unknown(Object object) { System.out.println("Received unknown: " + object); }
}Transactional Support – By setting transaction-id-prefix and consumer isolation level to read_committed in application.yml , all KafkaTemplate operations inside a @KafkaListener participate in the same transaction. The listener can pause for manual commit.
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
transaction-id-prefix: tx.
consumer:
properties:
isolation.level: read_committedExample listener that sends messages in a transaction and waits for user input before committing:
@KafkaListener(id = "fooGroup2", topics = "topic2")
public void listen(List<Foo2> foos) throws IOException {
logger.info("Received: " + foos);
foos.forEach(f -> kafkaTemplate.send("topic3", f.getFoo().toUpperCase()));
logger.info("Messages sent, hit enter to commit tx");
System.in.read();
}The article concludes that Spring for Apache Kafka reduces boilerplate and adds powerful features such as error handling, retries, and record filtering, while only scratching the surface of what the integration can achieve.
Architects Research Society
A daily treasure trove for architects, expanding your view and depth. We share enterprise, business, application, data, technology, and security architecture, discuss frameworks, planning, governance, standards, and implementation, and explore emerging styles such as microservices, event‑driven, micro‑frontend, big data, data warehousing, IoT, and AI architecture.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.