How Spring Kafka Listeners Work: From Single to Batch Processing

This article explains how Spring Kafka’s listener containers operate, detailing the internal workings of KafkaMessageListenerContainer and ConcurrentMessageListenerContainer, the role of @KafkaListener, Spring Boot auto‑configuration, and how to configure both single‑message and batch‑message processing within the same consumer group.

Programmer DD
Programmer DD
Programmer DD
How Spring Kafka Listeners Work: From Single to Batch Processing

Message Listener Containers

1. KafkaMessageListenerContainer

Provided by Spring to listen and pull messages, converting them and handing them to methods annotated with @KafkaListener, acting as a consumer.

Key code structure:

Entry method is doStart(), which implements SmartLifecycle; Spring manages its start and stop. ListenerConsumer is the actual runnable that pulls messages in a background thread.

In doStart() a ListenerConsumer is created and submitted to a thread pool.

These steps start the message listening process.

KafkaMessageListenerContainer#doStart

protected void doStart() {
    if (isRunning()) {
        return;
    }
    if (this.clientIdSuffix == null) { // stand‑alone container
        checkTopics();
    }
    ContainerProperties containerProperties = getContainerProperties();
    checkAckMode(containerProperties);
    // ...
    // create ListenerConsumer and submit to thread pool
    this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    setRunning(true);
    this.startLatch = new CountDownLatch(1);
    this.listenerConsumerFuture = containerProperties
        .getConsumerTaskExecutor()
        .submitListenable(this.listenerConsumer);
    // ...
}

KafkaMessageListenerContainer.ListenerConsumer#run

public void run() { // NOSONAR complexity
    // ...
    Throwable exitThrowable = null;
    while (isRunning()) {
        try {
            // pull and invoke messages
            pollAndInvoke();
        } catch (@SuppressWarnings(UNUSED) WakeupException e) {
            // ...
        }
        // ...
    }
    wrapUp(exitThrowable);
}

2. ConcurrentMessageListenerContainer

Concurrent message listening creates multiple KafkaMessageListenerContainer instances based on the configured concurrency, each acting as an independent consumer.

ConcurrentMessageListenerContainer#doStart

protected void doStart() {
    if (!isRunning()) {
        checkTopics();
        // ...
        setRunning(true);
        for (int i = 0; i < this.concurrency; i++) {
            KafkaMessageListenerContainer<K, V> container =
                constructContainer(containerProperties, topicPartitions, i);
            String beanName = getBeanName();
            container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
            // ...
            if (isPaused()) {
                container.pause();
            }
            // start the underlying KafkaMessageListenerContainer
            container.start();
            this.containers.add(container);
        }
    }
}

@KafkaListener underlying mechanism

The bridge is the @KafkaListener annotation.
KafkaListenerAnnotationBeanPostProcessor

scans for methods or classes annotated with @KafkaListener, creates the appropriate KafkaMessageListenerContainer via a KafkaListenerContainerFactory, and starts the container, thus linking business logic with the container's processing.

Spring Boot auto‑configuration for Kafka

1. KafkaAutoConfiguration

Automatically creates beans such as KafkaTemplate, ProducerListener, ConsumerFactory, ProducerFactory when they are missing.

2. KafkaAnnotationDrivenConfiguration

Handles operations behind annotations like @KafkaListener. When @EnableKafka is present, it creates a default bean named kafkaListenerContainerFactory if none is defined.

Production configuration examples

1. Single‑message processing

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        // ...
        return props;
    }
}

@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    // ...
}

2. Batch processing

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public KafkaListenerContainerFactory<?, ?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        // ...
        return props;
    }
}

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    // ...
}

3. Same consumer group supporting both single and batch processing

Initially a project may use single‑message consumption; as traffic grows, adding more consumers or partitions may still lead to backlog. Switching to batch consumption by configuring a batch KafkaListenerContainerFactory can increase throughput without changing the business logic.

Only batch certain topics

Keep the original single‑message configuration unchanged, then add a separate batch listener factory and point the relevant @KafkaListener to it.

Start with the existing single‑message setup.

Add a batch listener factory as shown above.

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean(name = "batchListenerContainerFactory")
    public KafkaListenerContainerFactory<?, ?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
    // other beans (consumerFactory, consumerConfigs) omitted for brevity
}

Key points

If a custom ContainerFactory bean name is not kafkaListenerContainerFactory, Spring will create a new bean via KafkaAnnotationDrivenConfiguration; ensure your @KafkaListener references the intended factory.

Single‑message and batch factories can coexist; the default bean name is kafkaListenerContainerFactory. Specify the containerFactory attribute in @KafkaListener to use a different one.

Summary

Spring integrates Kafka through the spring‑kafka module to simplify usage within the Spring ecosystem.

The @KafkaListener annotation enables both single‑message and batch processing; switching is driven by configuration.

Processing logic is handled by Spring, not directly by the Kafka client; batch mode allows handling a list of records in one call.

Be aware of automatically created beans and override them when specific requirements arise.

Debugging and source versions: Spring Boot 2.3.3.RELEASE, spring‑kafka 2.5.4.RELEASE.

backend-developmentSpring KafkaMessage Listenerbatch-processing
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.