Mastering Spring Kafka Listeners: Containers, Concurrency, and @KafkaListener

This article explains how Spring Kafka’s KafkaMessageListenerContainer and ConcurrentMessageListenerContainer work, details their start-up process and internal consumer threads, shows how @KafkaListener bridges business logic, and provides configuration examples for single‑message and batch processing in Spring Boot.

Programmer DD
Programmer DD
Programmer DD
Mastering Spring Kafka Listeners: Containers, Concurrency, and @KafkaListener

Message Listener Containers

1. KafkaMessageListenerContainer

Provided by Spring, this container listens for and pulls messages, converts them to a specified format, and hands them to methods annotated with @KafkaListener, effectively acting as a consumer.

The entry point is the doStart() method, which implements SmartLifecycle so Spring manages its start and stop operations.

The doStart() method creates a ListenerConsumer and submits it to a thread pool. ListenerConsumer implements Runnable; it runs a background thread that continuously polls and processes messages.

The start sequence initiates the message‑listening process.

KafkaMessageListenerContainer#doStart

protected void doStart() {
    if (isRunning()) {
        return;
    }
    if (this.clientIdSuffix == null) { // stand‑alone container
        checkTopics();
    }
    ContainerProperties containerProperties = getContainerProperties();
    checkAckMode(containerProperties);
    // ...
    // Create ListenerConsumer and submit to thread pool
    this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    setRunning(true);
    this.startLatch = new CountDownLatch(1);
    this.listenerConsumerFuture = containerProperties
        .getConsumerTaskExecutor()
        .submitListenable(this.listenerConsumer);
    // ...
}

KafkaMessageListenerContainer.ListenerConsumer#run

public void run() { // NOSONAR complexity
    // ...
    Throwable exitThrowable = null;
    while (isRunning()) {
        try {
            // Pull and handle messages
            pollAndInvoke();
        } catch (@SuppressWarnings(UNUSED) WakeupException e) {
            // ...
        }
        // ...
    }
    wrapUp(exitThrowable);
}

2. ConcurrentMessageListenerContainer

This container creates multiple underlying KafkaMessageListenerContainer instances based on the configured concurrency, effectively spawning several consumer threads.

ConcurrentMessageListenerContainer#doStart

protected void doStart() {
    if (!isRunning()) {
        checkTopics();
        // ...
        setRunning(true);
        for (int i = 0; i < this.concurrency; i++) {
            KafkaMessageListenerContainer<K, V> container =
                constructContainer(containerProperties, topicPartitions, i);
            String beanName = getBeanName();
            container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
            // ...
            if (isPaused()) {
                container.pause();
            }
            // Start the underlying container
            container.start();
            this.containers.add(container);
        }
    }
}

@KafkaListener Underlying Mechanism

The KafkaMessageListenerContainer pulls and processes messages, but the missing link is how our business logic is connected to this container.

The bridge is the @KafkaListener annotation. KafkaListenerAnnotationBeanPostProcessor scans beans and methods annotated with @KafkaListener, creates a corresponding KafkaMessageListenerContainer via KafkaListenerContainerFactory, and starts it.

Spring Boot Auto‑Configuration for Kafka

1. KafkaAutoConfiguration

Automatically creates beans such as KafkaTemplate, ProducerListener, ConsumerFactory, ProducerFactory when they are missing.

2. KafkaAnnotationDrivenConfiguration

Handles the infrastructure behind annotations like @KafkaListener.

When @EnableKafka is present, Spring registers a default kafkaListenerContainerFactory if none is defined, allowing @KafkaListener to work without explicit consumer configuration.

Production Configuration

1. Single‑Message Processing

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        // ... other properties
        return props;
    }
}

@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    // handle single record
}

2. Batch Processing

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public KafkaListenerContainerFactory<?, ?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true); // enable batch mode
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        // ... other properties
        return props;
    }
}

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    // handle batch of records
}

3. Mixing Single and Batch in the Same Consumer Group

Scenario: start with single‑message consumption, later add a batch listener to relieve backlog without changing the original configuration.

Keep the original single‑message configuration unchanged.

Add a separate KafkaListenerContainerFactory for batch processing and reference it via the containerFactory attribute of @KafkaListener.

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean(name = "batchListenerContainerFactory")
    public KafkaListenerContainerFactory<?, ?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
    // consumerFactory and consumerConfigs beans as before
}

Key points:

If a custom container factory’s bean name differs from kafkaListenerContainerFactory, Spring creates a separate instance via KafkaAnnotationDrivenConfiguration; ensure the correct factory is referenced in @KafkaListener.

Single‑message and batch factories can coexist; the default bean name is used unless explicitly overridden.

Summary

Spring provides the spring‑kafka module to integrate Kafka seamlessly into the Spring ecosystem.

The @KafkaListener annotation enables both single‑record and batch processing within the same project by adjusting container factory configurations.

Both processing modes are handled by Spring, not by the raw Kafka client; batch mode simply delivers multiple records to the listener in one call.

Developers should be aware of the beans Spring auto‑creates and can override them to meet specific requirements.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

javabackend-developmentconcurrencyspringKafkaMessage Listener
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.