How to Integrate Kafka with SpringBoot for High‑Performance Messaging

This article walks through Kafka’s core architecture, explains why it achieves massive throughput, and provides a step‑by‑step SpringBoot integration—including environment setup, Maven dependencies, configuration, producer and consumer code, advanced features like transactions and dead‑letter queues, plus performance monitoring and tuning tips.

Java Tech Workshop
Java Tech Workshop
Java Tech Workshop
How to Integrate Kafka with SpringBoot for High‑Performance Messaging

1. Kafka Overview

Kafka is a distributed, high‑throughput message‑queue system that acts like a super‑highway for massive data streams across services, used in scenarios ranging from e‑commerce flash sales to real‑time monitoring.

2. Core Concepts

Producer : Sends messages; supports batch and async sending.

Consumer : Receives messages; supports consumer groups and concurrent consumption.

Broker : Stores messages and handles requests.

Topic : Logical channel for categorising messages.

Partition : Enables parallel processing and load‑balancing.

Offset : Tracks the consumption position of each consumer.

3. Why Kafka Is Fast

Sequential disk writes – avoids random I/O, making disk write speed close to memory.

Zero‑copy – data moves directly from kernel buffers to the network without user‑space copying.

Batch compression – groups messages and compresses them to reduce network traffic.

Partition parallelism – multiple partitions are processed simultaneously, increasing overall throughput.

4. Hands‑On Integration with SpringBoot

4.1 Environment Preparation

# 1. Start ZooKeeper (Kafka dependency)
bin/zookeeper-server-start.sh config/zookeeper.properties

# 2. Start Kafka broker
bin/kafka-server-start.sh config/server.properties

# 3. Create a test topic with 3 partitions and 1 replica
bin/kafka-topics.sh --create \
  --topic order-topic \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

4.2 Maven Dependencies

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

4.3 Application Configuration (application.yml)

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all          # wait for all replicas
      retries: 3
      batch-size: 16384 # 16KB
      linger-ms: 5       # wait 5ms before sending batch
      buffer-memory: 33554432 # 32MB
    consumer:
      group-id: order-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest # start from earliest offset
      enable-auto-commit: false   # manual offset commit
      max-poll-records: 100      # fetch 100 records per poll

4.4 Message Entity

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderMessage {
    private String orderId;
    private String userId;
    private BigDecimal amount;
    private LocalDateTime createTime;
}

4.5 Producer Implementation

@Component
public class OrderProducer {
    private static final String TOPIC = "order-topic";
    private final KafkaTemplate<String, OrderMessage> kafkaTemplate;

    @Autowired
    public OrderProducer(KafkaTemplate<String, OrderMessage> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /** Send order message asynchronously */
    public void sendOrderMessage(OrderMessage message) {
        ListenableFuture<SendResult<String, OrderMessage>> future =
                kafkaTemplate.send(TOPIC, message.getOrderId(), message);
        future.addCallback(
                result -> {
                    RecordMetadata metadata = result.getRecordMetadata();
                    log.info("Message sent | Topic: {}, Partition: {}, Offset: {}",
                            metadata.topic(), metadata.partition(), metadata.offset());
                },
                ex -> {
                    log.error("Message send failed | OrderId: {}, Error: {}",
                            message.getOrderId(), ex.getMessage());
                });
    }
}

4.6 Consumer Implementation

@Component
public class OrderConsumer {
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group", concurrency = "3")
    public void consumeOrderMessage(@Payload OrderMessage message,
                                    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                    @Header(KafkaHeaders.OFFSET) long offset,
                                    Acknowledgment acknowledgment) {
        try {
            processOrder(message);
            acknowledgment.acknowledge(); // manual offset commit
            log.info("Consume success | OrderId: {}, Topic: {}, Partition: {}, Offset: {}",
                    message.getOrderId(), topic, partition, offset);
        } catch (Exception e) {
            log.error("Consume failed | OrderId: {}, Error: {}", message.getOrderId(), e.getMessage());
            // optional retry or dead‑letter handling
        }
    }

    private void processOrder(OrderMessage message) {
        // business logic such as inventory deduction, invoice generation, etc.
        log.info("Processing order: {}", message.getOrderId());
    }
}

4.7 Test REST API

@RestController
@RequestMapping("/api/orders")
public class OrderController {
    private final OrderProducer orderProducer;

    @Autowired
    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }

    @PostMapping
    public ResponseEntity<String> createOrder(@RequestBody OrderDTO orderDTO) {
        OrderMessage message = OrderMessage.builder()
                .orderId(UUID.randomUUID().toString())
                .userId(orderDTO.getUserId())
                .amount(orderDTO.getAmount())
                .createTime(LocalDateTime.now())
                .build();
        orderProducer.sendOrderMessage(message);
        return ResponseEntity.ok("Order created, message sent");
    }
}

5. Advanced Features

5.1 Transactional Messaging

@Configuration
public class KafkaTransactionConfig {
    @Bean
    public KafkaTransactionManager<String, OrderMessage> kafkaTransactionManager(
            ProducerFactory<String, OrderMessage> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
}

@Transactional
public void sendOrderMessageTransactional(OrderMessage message) {
    // first persist to DB
    orderRepository.save(message);
    // then send to Kafka within the same transaction
    kafkaTemplate.send(TOPIC, message);
}

5.2 Dead‑Letter Queue Configuration

@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, OrderMessage> kafkaTemplate) {
    return new DeadLetterPublishingRecoverer(kafkaTemplate);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    // configure error handler with DLQ (retry 3 times)
    factory.setErrorHandler(new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer, 3));
    return factory;
}

5.3 Message Filtering

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderMessage> filterContainerFactory(
        ConsumerFactory<String, OrderMessage> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, OrderMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    // filter out orders with amount < 100
    factory.setRecordFilterStrategy(record ->
            record.value().getAmount().compareTo(BigDecimal.valueOf(100)) < 0);
    return factory;
}

@KafkaListener(topics = "order-topic", groupId = "filter-group", containerFactory = "filterContainerFactory")
public void consumeFilteredOrder(OrderMessage message) {
    log.info("Filtered consume: {}", message);
}

6. Performance Monitoring & Tuning

Message latency – time from production to consumption (monitor via Kafka Consumer Metrics).

Consume rate – messages per second (Kafka Consumer Metrics).

Partition offset – per‑partition consumption progress (Kafka AdminClient).

Broker health – use Kafka health‑check endpoints or monitoring tools.

6.1 Tuning Recommendations

# Production‑grade configuration
spring:
  kafka:
    producer:
      compression-type: snappy   # enable Snappy compression
      batch-size: 32768          # larger batch (32KB)
      linger-ms: 100             # wait up to 100ms before sending batch
    consumer:
      fetch-min-bytes: 10240    # minimum fetch size 10KB
      fetch-max-wait-ms: 500     # max wait 500ms for data

7. Conclusion

Through this guide we covered:

Kafka’s core architecture and principles.

Complete SpringBoot integration workflow.

Best practices for producers and consumers.

Advanced capabilities such as transactional messaging and dead‑letter queues.

Performance monitoring and tuning advice.

Discussion: What pitfalls have you encountered when using Kafka in your projects? Share your experience in the comments.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaKafkaMessage QueueSpringBootConsumerProducerTransactional MessagingDead Letter Queue
Java Tech Workshop
Written by

Java Tech Workshop

Focused on Java backend technologies, sharing fundamentals, multithreading, JVM, the Spring ecosystem, microservices, distributed systems, high concurrency, source‑code analysis, and practical experience. Continuously delivers high‑quality original content, interview guides, and learning roadmaps to help Java developers progress from beginner to advanced, enhancing technical skills and core competitiveness.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.