How Spring Kafka Listeners Work: From Single to Batch Processing
This article explains how Spring Kafka’s listener containers operate, detailing the internal workings of KafkaMessageListenerContainer and ConcurrentMessageListenerContainer, the role of @KafkaListener, Spring Boot auto‑configuration, and how to configure both single‑message and batch‑message processing within the same consumer group.
Message Listener Containers
1. KafkaMessageListenerContainer
Provided by Spring to listen and pull messages, converting them and handing them to methods annotated with @KafkaListener, acting as a consumer.
Key code structure:
Entry method is doStart(), which implements SmartLifecycle; Spring manages its start and stop. ListenerConsumer is the actual runnable that pulls messages in a background thread.
In doStart() a ListenerConsumer is created and submitted to a thread pool.
These steps start the message listening process.
KafkaMessageListenerContainer#doStart
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand‑alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
// ...
// create ListenerConsumer and submit to thread pool
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
// ...
}KafkaMessageListenerContainer.ListenerConsumer#run
public void run() { // NOSONAR complexity
// ...
Throwable exitThrowable = null;
while (isRunning()) {
try {
// pull and invoke messages
pollAndInvoke();
} catch (@SuppressWarnings(UNUSED) WakeupException e) {
// ...
}
// ...
}
wrapUp(exitThrowable);
}2. ConcurrentMessageListenerContainer
Concurrent message listening creates multiple KafkaMessageListenerContainer instances based on the configured concurrency, each acting as an independent consumer.
ConcurrentMessageListenerContainer#doStart
protected void doStart() {
if (!isRunning()) {
checkTopics();
// ...
setRunning(true);
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container =
constructContainer(containerProperties, topicPartitions, i);
String beanName = getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
// ...
if (isPaused()) {
container.pause();
}
// start the underlying KafkaMessageListenerContainer
container.start();
this.containers.add(container);
}
}
}@KafkaListener underlying mechanism
The bridge is the @KafkaListener annotation.
KafkaListenerAnnotationBeanPostProcessorscans for methods or classes annotated with @KafkaListener, creates the appropriate KafkaMessageListenerContainer via a KafkaListenerContainerFactory, and starts the container, thus linking business logic with the container's processing.
Spring Boot auto‑configuration for Kafka
1. KafkaAutoConfiguration
Automatically creates beans such as KafkaTemplate, ProducerListener, ConsumerFactory, ProducerFactory when they are missing.
2. KafkaAnnotationDrivenConfiguration
Handles operations behind annotations like @KafkaListener. When @EnableKafka is present, it creates a default bean named kafkaListenerContainerFactory if none is defined.
Production configuration examples
1. Single‑message processing
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
// ...
return props;
}
}
@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
// ...
}2. Batch processing
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
// ...
return props;
}
}
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
// ...
}3. Same consumer group supporting both single and batch processing
Initially a project may use single‑message consumption; as traffic grows, adding more consumers or partitions may still lead to backlog. Switching to batch consumption by configuring a batch KafkaListenerContainerFactory can increase throughput without changing the business logic.
Only batch certain topics
Keep the original single‑message configuration unchanged, then add a separate batch listener factory and point the relevant @KafkaListener to it.
Start with the existing single‑message setup.
Add a batch listener factory as shown above.
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean(name = "batchListenerContainerFactory")
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
// other beans (consumerFactory, consumerConfigs) omitted for brevity
}Key points
If a custom ContainerFactory bean name is not kafkaListenerContainerFactory, Spring will create a new bean via KafkaAnnotationDrivenConfiguration; ensure your @KafkaListener references the intended factory.
Single‑message and batch factories can coexist; the default bean name is kafkaListenerContainerFactory. Specify the containerFactory attribute in @KafkaListener to use a different one.
Summary
Spring integrates Kafka through the spring‑kafka module to simplify usage within the Spring ecosystem.
The @KafkaListener annotation enables both single‑message and batch processing; switching is driven by configuration.
Processing logic is handled by Spring, not directly by the Kafka client; batch mode allows handling a list of records in one call.
Be aware of automatically created beans and override them when specific requirements arise.
Debugging and source versions: Spring Boot 2.3.3.RELEASE, spring‑kafka 2.5.4.RELEASE.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
