Integrate Kafka with Spring Boot 1.4 Using Spring Integration – Step‑by‑Step Guide

This guide walks you through setting up Kafka and Zookeeper, adding Spring Integration dependencies, configuring application.yml, creating producer and consumer configurations with @Configuration and @EnableKafka, implementing a @KafkaListener, and testing the integration via a Spring MVC endpoint, while highlighting common pitfalls.

BiCaiJia Technology Team
BiCaiJia Technology Team
BiCaiJia Technology Team
Integrate Kafka with Spring Boot 1.4 Using Spring Integration – Step‑by‑Step Guide

Prerequisite

Kafka and Zookeeper environment completed JDK installed Understand the roles of Kafka and Zookeeper

Demo

Continue using the existing feignserver Spring Boot 1.5+ supports spring‑integration, but this article uses 1.4+ and the config‑style approach

Import Kafka dependencies

build.gradle

compile ('org.springframework.integration:spring-integration-core:4.3.6.RELEASE')
compile ('org.springframework.kafka:spring-kafka:1.1.0.RELEASE')

Configure parameters in

application.yml
bootcwenao:
  kafka:
    binder:
      brokers: localhost:9092
    zk:
      nodes: localhost:2181
    group: cwenao-group

Create KafkaProducersConfig with @Configuration and

@EnableKafka
@Configuration
@EnableKafka
public class KafkaProducersConfig {
    @Value("${bootcwenao.kafka.binder.brokers}")
    private String brokers;

    @Bean("kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ProducerConfig.RECEIVE_CONFIG, 0);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }
}

Create a listener using

@KafkaListener
public class KafkaListeners {
    @KafkaListener(topics = "bootcwenaoTopic")
    public void testListener(ConsumerRecord<?, ?> record) {
        Optional<?> messages = Optional.ofNullable(record.value());
        if (messages.isPresent()) {
            Object msg = messages.get();
            System.out.println("  this is the testTopic send message: " + msg);
        }
    }
}

Create KafkaConsumerConfig with @Configuration and

@EnableKafka
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${bootcwenao.kafka.binder.brokers}")
    private String brokers;

    @Value("${bootcwenao.kafka.group}")
    private String group;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    }

    @Bean
    public KafkaListeners kafkaListeners() {
        return new KafkaListeners();
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new DefaultKafkaConsumerFactory<>(properties);
    }
}

Test

Create a producer endpoint

testKafka
@Controller
public class FeignController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/testKafka")
    @ResponseBody
    public void testkafka(String message) {
        kafkaTemplate.send("bootcwenaoTopic", "bootcwnao", message);
    }
}

Start discovery, configserver, apigateway, and feignserver, then call

http://localhost:10002/servers/testKafka?message=aaaaaaaaaaabbbbb

. The KafkaListeners output shows the received message.

Possible Errors

Kafka version mismatch – this method only supports 0.10.x.x Kafka configuration may not expose host and port advertised.host.name and advertised.port might need to be set Incorrect broker‑list configuration
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Backend DevelopmentKafkaSpring BootMessagingSpring Integration
BiCaiJia Technology Team
Written by

BiCaiJia Technology Team

BiCaiJia Technology Team

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.