Mastering Spring Kafka Listeners: Containers, Concurrency, and @KafkaListener
This article explains how Spring Kafka’s KafkaMessageListenerContainer and ConcurrentMessageListenerContainer work, details their start-up process and internal consumer threads, shows how @KafkaListener bridges business logic, and provides configuration examples for single‑message and batch processing in Spring Boot.
Message Listener Containers
1. KafkaMessageListenerContainer
Provided by Spring, this container listens for and pulls messages, converts them to a specified format, and hands them to methods annotated with @KafkaListener, effectively acting as a consumer.
The entry point is the doStart() method, which implements SmartLifecycle so Spring manages its start and stop operations.
The doStart() method creates a ListenerConsumer and submits it to a thread pool. ListenerConsumer implements Runnable; it runs a background thread that continuously polls and processes messages.
The start sequence initiates the message‑listening process.
KafkaMessageListenerContainer#doStart
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand‑alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
// ...
// Create ListenerConsumer and submit to thread pool
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
// ...
}KafkaMessageListenerContainer.ListenerConsumer#run
public void run() { // NOSONAR complexity
// ...
Throwable exitThrowable = null;
while (isRunning()) {
try {
// Pull and handle messages
pollAndInvoke();
} catch (@SuppressWarnings(UNUSED) WakeupException e) {
// ...
}
// ...
}
wrapUp(exitThrowable);
}2. ConcurrentMessageListenerContainer
This container creates multiple underlying KafkaMessageListenerContainer instances based on the configured concurrency, effectively spawning several consumer threads.
ConcurrentMessageListenerContainer#doStart
protected void doStart() {
if (!isRunning()) {
checkTopics();
// ...
setRunning(true);
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container =
constructContainer(containerProperties, topicPartitions, i);
String beanName = getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
// ...
if (isPaused()) {
container.pause();
}
// Start the underlying container
container.start();
this.containers.add(container);
}
}
}@KafkaListener Underlying Mechanism
The KafkaMessageListenerContainer pulls and processes messages, but the missing link is how our business logic is connected to this container.
The bridge is the @KafkaListener annotation. KafkaListenerAnnotationBeanPostProcessor scans beans and methods annotated with @KafkaListener, creates a corresponding KafkaMessageListenerContainer via KafkaListenerContainerFactory, and starts it.
Spring Boot Auto‑Configuration for Kafka
1. KafkaAutoConfiguration
Automatically creates beans such as KafkaTemplate, ProducerListener, ConsumerFactory, ProducerFactory when they are missing.
2. KafkaAnnotationDrivenConfiguration
Handles the infrastructure behind annotations like @KafkaListener.
When @EnableKafka is present, Spring registers a default kafkaListenerContainerFactory if none is defined, allowing @KafkaListener to work without explicit consumer configuration.
Production Configuration
1. Single‑Message Processing
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
// ... other properties
return props;
}
}
@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
// handle single record
}2. Batch Processing
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // enable batch mode
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
// ... other properties
return props;
}
}
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
// handle batch of records
}3. Mixing Single and Batch in the Same Consumer Group
Scenario: start with single‑message consumption, later add a batch listener to relieve backlog without changing the original configuration.
Keep the original single‑message configuration unchanged.
Add a separate KafkaListenerContainerFactory for batch processing and reference it via the containerFactory attribute of @KafkaListener.
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean(name = "batchListenerContainerFactory")
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
// consumerFactory and consumerConfigs beans as before
}Key points:
If a custom container factory’s bean name differs from kafkaListenerContainerFactory, Spring creates a separate instance via KafkaAnnotationDrivenConfiguration; ensure the correct factory is referenced in @KafkaListener.
Single‑message and batch factories can coexist; the default bean name is used unless explicitly overridden.
Summary
Spring provides the spring‑kafka module to integrate Kafka seamlessly into the Spring ecosystem.
The @KafkaListener annotation enables both single‑record and batch processing within the same project by adjusting container factory configurations.
Both processing modes are handled by Spring, not by the raw Kafka client; batch mode simply delivers multiple records to the listener in one call.
Developers should be aware of the beans Spring auto‑creates and can override them to meet specific requirements.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
