Kafka Beginner's Practical Guide – All You Need to Get Started
This article introduces Kafka's definition, core architecture, installation steps, command‑line tools, and shows how to integrate Kafka with Spring Boot through detailed Java examples, custom partitioners, and producer/consumer configurations.
1. What is Kafka
Kafka is traditionally defined as a distributed publish/subscribe message queue used for real‑time big‑data processing. The modern definition describes Kafka as an open‑source distributed event‑streaming platform used by thousands of companies for high‑performance data pipelines, stream analytics, data integration, and mission‑critical applications.
Point‑to‑point model : a message sent by system A can be consumed only by system B.
Publish/subscribe model : messages are categorized into topics; multiple producers can publish to the same topic and multiple subscribers can receive the same messages.
2. Kafka Architecture and Core Concepts
Key components:
Topic : logical container for related messages.
Producer : client that continuously sends messages to one or more topics.
Consumer : client that subscribes to topics and pulls messages.
Consumer Group : a set of consumers where each partition is consumed by only one member, providing parallelism while keeping ordering per partition.
Broker : a Kafka server; a cluster consists of multiple brokers.
Partition : an ordered log segment within a topic; a topic can be split across many partitions for scalability.
Replica : each partition has a leader and one or more followers; followers replicate the leader’s data to ensure fault tolerance.
Data is stored in an append‑only log file, enabling sequential I/O and high throughput. Log segments are created when a segment fills up; old segments are periodically deleted to reclaim disk space.
3. Installing Kafka
Kafka requires a Java runtime (JDK). Download the binary from the official Apache Kafka site, extract it, and start ZooKeeper (which manages the cluster) before launching the Kafka broker.
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties4. Kafka Command‑Line Operations
The bin directory contains scripts for broker management, producer/consumer testing, and topic administration.
# List topics
bin/kafka-topics.sh --zookeeper 10.10.0.18:2181 --list
# Create a topic named "zfj-topic" with 3 partitions and 1 replica
bin/kafka-topics.sh --zookeeper 10.10.0.18:2181 --create \
--replication-factor 1 --partitions 3 --topic zfj-topic
# Send messages with a producer
bin/kafka-console-producer.sh --broker-list 10.10.0.18:9092 --topic zfj-topic
# Consume messages
bin/kafka-console-consumer.sh --bootstrap-server 10.10.0.18:9092 --topic zfj-topic
# Describe the topic
bin/kafka-topics.sh --zookeeper 10.10.0.18:2181 --describe --topic zfj-topic5. Integrating Kafka into a Spring Boot Project
Add the Spring Kafka dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>Custom producer example (properties configuration and message sending):
package com.shepherd.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.0.18:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.shepherd.kafka.producer.MyPartitioner");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("topic1", Integer.toString(i), "val" + i));
}
producer.close();
}
}Custom partitioner implementation (simple modulo‑based strategy):
package com.shepherd.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer intKey = Integer.valueOf(key.toString());
int partitionCount = cluster.partitionCountForTopic(topic);
return intKey % partitionCount;
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}Consumer example (polls messages every second and prints them):
package com.shepherd.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.0.18:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
topics.add("topic1");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
}
}Spring Boot configuration (application.properties) for producer and consumer settings, including custom partitioner class, retry policy, batch size, and consumer group options:
server.port=1008
spring.kafka.bootstrap-servers=10.10.0.18:9092
# Producer settings
spring.kafka.producer.retries=0
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.properties.linger.ms=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.partitioner.class=com.shepherd.kafka.producer.MyPartitioner
# Consumer settings
spring.kafka.consumer.group-id=defaultConsumerGroup
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto.commit.interval.ms=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.session.timeout.ms=120000
spring.kafka.consumer.request.timeout.ms=180000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.missing-topics-fatal=falseSpring‑based producer controller demonstrates asynchronous sending with callbacks:
package com.shepherd.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/kafka/produce")
public class ProducerController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/{message}")
public void sendMessageNoCallback(@PathVariable("message") String message) {
kafkaTemplate.send("topic1", message);
}
@GetMapping("/callback1/{message}")
public void sendMessageWithLambda(@PathVariable("message") String message) {
kafkaTemplate.send("topic1", message).addCallback(
success -> {
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
System.out.println("Sent successfully: " + topic + "-" + partition + "-" + offset);
},
failure -> System.out.println("Send failed: " + failure.getMessage())
);
}
@GetMapping("/callback2/{message}")
public void sendMessageWithCallbackObject(@PathVariable("message") String message) {
kafkaTemplate.send("topic1", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("Sent successfully: " + result.getRecordMetadata().topic() + "-" +
result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Send failed: " + ex.getMessage());
}
});
}
}Spring‑managed consumer using @KafkaListener to consume from "topic1":
package com.shepherd.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerConfig {
@KafkaListener(topics = {"topic1"})
public void onMessage(ConsumerRecord<?, ?> record) {
System.out.println("Simple consume: " + record.topic() + "-" + record.partition() + "-" + record.value());
}
}Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Shepherd Advanced Notes
Dedicated to sharing advanced Java technical insights, daily work snippets, and the power of persistent effort.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
