Backend Development 12 min read

Boost Kafka Throughput in Spring Boot: Batch Consumption Guide

This article demonstrates how to integrate Kafka with Spring Boot, add necessary dependencies and configuration, implement both single‑message and batch consumers, and tune batch settings to dramatically improve processing speed for millions of records in a microservice environment.

macrozheng
macrozheng
macrozheng
Boost Kafka Throughput in Spring Boot: Batch Consumption Guide

Introduction

In a previous article we introduced Kafka's architecture and how partitioning can accelerate data consumption in a cluster. Theory alone is insufficient; practical implementation is required.

Using a real production scenario, we will show how to employ

SpringBoot

to consume Kafka data with high throughput.

Practical Implementation

The data source generates over 10 million order records nightly. We need to ingest this data efficiently.

The open‑source mall project (SpringBoot3 + Vue) provides a complete e‑commerce system with microservice architecture, Docker, and K8s deployment. Boot project: https://github.com/macrozheng/mall Cloud project: https://github.com/macrozheng/mall-swarm Video tutorials: https://www.macrozheng.com/video/

2.1 Add Kafka Dependency

The project uses SpringBoot

2.1.5.RELEASE

and Kafka

2.2.6.RELEASE

.

<code>&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
    &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
    &lt;version&gt;2.2.6.RELEASE&lt;/version&gt;
&lt;/dependency&gt;</code>

2.2 Add Kafka Configuration Properties

Insert the following properties into

application.properties

:

<code># Kafka server address (comma‑separated for multiple brokers)
spring.kafka.bootstrap-servers=197.168.25.196:9092
# Retry count
spring.kafka.producer.retries=3
# Batch size
spring.kafka.producer.batch-size=1000
# Buffer memory (32 MB)
spring.kafka.producer.buffer-memory=33554432
# Consumer group
spring.kafka.consumer.group-id=crm-microservice-newperformance
# Offset reset
spring.kafka.consumer.auto-offset-reset=earliest
# Max poll records
spring.kafka.consumer.max-poll-records=4000
# Auto commit
spring.kafka.consumer.enable-auto-commit=true
# Auto commit interval (ms)
spring.kafka.consumer.auto-commit-interval=1000</code>

2.3 Create a Consumer

<code>@Component
public class BigDataTopicListener {
    private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);

    @KafkaListener(topics = {"big_data_topic"})
    public void consumer(ConsumerRecord<?, ?> consumerRecord) {
        log.info("Received bigData message: {}", consumerRecord.toString());
        // db.save(consumerRecord);
    }
}</code>

2.4 Simulate Data Push

<code>@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    public void testSend() {
        for (int i = 0; i < 5000; i++) {
            Map<String, Object> map = new LinkedHashMap<>();
            map.put("datekey", 20210610);
            map.put("userid", i);
            map.put("salaryAmount", i);
            kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
        }
    }
}</code>

Initial single‑message consumption works, but processing 10 million records takes about three hours, which is unacceptable.

2.5 Switch to Batch Consumption

Create a

KafkaConfiguration

class to define producer and consumer factories and a batch listener container:

<code>@Configuration
public class KafkaConfiguration {
    @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers;
    @Value("${spring.kafka.producer.retries}") private Integer retries;
    @Value("${spring.kafka.producer.batch-size}") private Integer batchSize;
    @Value("${spring.kafka.producer.buffer-memory}") private Integer bufferMemory;
    @Value("${spring.kafka.consumer.group-id}") private String groupId;
    @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset;
    @Value("${spring.kafka.consumer.max-poll-records}") private Integer maxPollRecords;
    @Value("${spring.kafka.consumer.batch.concurrency}") private Integer batchConcurrency;
    @Value("${spring.kafka.consumer.enable-auto-commit}") private Boolean autoCommit;
    @Value("${spring.kafka.consumer.auto-commit-interval}") private Integer autoCommitInterval;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setConcurrency(batchConcurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchListener(true);
        return factory;
    }
}
</code>

Add the new property

spring.kafka.consumer.batch.concurrency

to control the number of consumer threads.

<code># Batch consumer concurrency (≤ topic partitions)
spring.kafka.consumer.batch.concurrency=3
# Max records per poll
spring.kafka.consumer.max-poll-records=4000
# Disable auto‑commit for manual control
spring.kafka.consumer.enable-auto-commit=false</code>

Update the listener to use batch mode:

<code>@Component
public class BigDataTopicListener {
    private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);

    @KafkaListener(topics = {"big_data_topic"}, containerFactory = "batchFactory")
    public void batchConsumer(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {
        long start = System.currentTimeMillis();
        // db.batchSave(consumerRecords);
        ack.acknowledge();
        log.info("Received bigData batch: {}, consumption time: {}ms", consumerRecords.size(), System.currentTimeMillis() - start);
    }
}
</code>

With three consumer instances and three topic partitions, processing 5 million records finishes within 30 minutes, a dramatic improvement over the original three‑hour runtime.

In production, adjust

max-poll-records

and the number of partitions or scale the microservice cluster to meet higher throughput demands, while avoiding excessively large batch sizes that could trigger frequent GC pauses.

Conclusion

This tutorial showed how to use SpringBoot and Kafka to achieve high‑throughput data consumption in a microservice, laying the groundwork for further topics such as failure handling.

BackendJavamicroservicesKafkaSpringBootBatchProcessing
macrozheng
Written by

macrozheng

Dedicated to Java tech sharing and dissecting top open-source projects. Topics include Spring Boot, Spring Cloud, Docker, Kubernetes and more. Author’s GitHub project “mall” has 50K+ stars.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.