Integrate Kafka with Spring Boot 1.4 Using Spring Integration – Step‑by‑Step Guide
This guide walks you through setting up Kafka and Zookeeper, adding Spring Integration dependencies, configuring application.yml, creating producer and consumer configurations with @Configuration and @EnableKafka, implementing a @KafkaListener, and testing the integration via a Spring MVC endpoint, while highlighting common pitfalls.
Prerequisite
Kafka and Zookeeper environment completed JDK installed Understand the roles of Kafka and Zookeeper
Demo
Continue using the existing feignserver Spring Boot 1.5+ supports spring‑integration, but this article uses 1.4+ and the config‑style approach
Import Kafka dependencies
build.gradle
compile ('org.springframework.integration:spring-integration-core:4.3.6.RELEASE')
compile ('org.springframework.kafka:spring-kafka:1.1.0.RELEASE')Configure parameters in
application.yml bootcwenao:
kafka:
binder:
brokers: localhost:9092
zk:
nodes: localhost:2181
group: cwenao-groupCreate KafkaProducersConfig with @Configuration and
@EnableKafka @Configuration
@EnableKafka
public class KafkaProducersConfig {
@Value("${bootcwenao.kafka.binder.brokers}")
private String brokers;
@Bean("kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(ProducerConfig.RECEIVE_CONFIG, 0);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
}Create a listener using
@KafkaListener public class KafkaListeners {
@KafkaListener(topics = "bootcwenaoTopic")
public void testListener(ConsumerRecord<?, ?> record) {
Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) {
Object msg = messages.get();
System.out.println(" this is the testTopic send message: " + msg);
}
}
}Create KafkaConsumerConfig with @Configuration and
@EnableKafka @Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${bootcwenao.kafka.binder.brokers}")
private String brokers;
@Value("${bootcwenao.kafka.group}")
private String group;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(4000);
return factory;
}
@Bean
public KafkaListeners kafkaListeners() {
return new KafkaListeners();
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(properties);
}
}Test
Create a producer endpoint
testKafka @Controller
public class FeignController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/testKafka")
@ResponseBody
public void testkafka(String message) {
kafkaTemplate.send("bootcwenaoTopic", "bootcwnao", message);
}
}Start discovery, configserver, apigateway, and feignserver, then call
http://localhost:10002/servers/testKafka?message=aaaaaaaaaaabbbbb. The KafkaListeners output shows the received message.
Possible Errors
Kafka version mismatch – this method only supports 0.10.x.x Kafka configuration may not expose host and port advertised.host.name and advertised.port might need to be set Incorrect broker‑list configuration
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
