Mastering Kafka: Build Producers, Consumers, and Custom Partitioners
This extensive tutorial walks through Kafka producer and consumer fundamentals, demonstrates how to configure key‑based and custom partitioning, explains offset management, consumer group coordination, and essential configuration parameters, and includes complete Java code examples for real‑world e‑commerce scenarios.
Kafka Producer Example
In an e‑commerce scenario, orders are logged as JSON messages and sent to a Kafka topic (e.g., tellYourDream) using a producer. The producer is created with typical properties such as bootstrap servers, serializers, buffer memory, compression, batch size, linger, retries, and acks.
public class OrderProducer {
public static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("buffer.memory", 33554432);
props.put("compression.type", "lz4");
props.put("batch.size", 32768);
props.put("linger.ms", 100);
props.put("retries", 10);
props.put("retry.backoff.ms", 300);
props.put("request.required.acks", "1");
return new KafkaProducer<>(props);
}
}A helper method creates a JSON order record, and the main method sends it with a key (e.g., userId) so that all messages for the same user go to the same partition.
public static JSONObject createRecord() {
JSONObject order = new JSONObject();
order.put("userId", 12344);
order.put("amount", 100.0);
order.put("statement", "pay");
return order;
} public static void main(String[] args) throws Exception {
KafkaProducer<String, String> producer = createProducer();
JSONObject order = createRecord();
ProducerRecord<String, String> record = new ProducerRecord<>(
"tellYourDream", order.getString("userId"), order.toString());
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message sent successfully");
}
});
Thread.sleep(10000);
producer.close();
}If a message fails after retries, a fallback path such as persisting to MySQL or Redis is recommended.
Key‑Based Partitioning
When no key is supplied, Kafka distributes messages round‑robin across partitions. Supplying a key causes Kafka to hash the key and assign the message to a partition based on the hash modulo the number of partitions, ensuring that messages with the same key always land in the same partition.
Using a consistent key (e.g., userId) prevents ordering problems such as processing a refund before the original purchase.
Custom Partitioner
A custom partitioner can direct specific keys to designated partitions. The example PhonenumPartitioner sends calls from two special phone numbers to the last partition and otherwise hashes the first three digits of the phone number.
public class PhonenumPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) { }
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key.toString().equals("10000") || key.toString().equals("11111")) {
return numPartitions - 1;
}
String phoneNum = key.toString();
return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
}
@Override
public void close() { }
}A producer that uses this partitioner selects a random phone number and sends it, printing the partition chosen.
public class PartitionerProducer {
private static final String[] PHONE_NUMS = {
"10000","10000","11111","13700000003","13700000004",
"10000","15500000006","11111","15500000008",
"17600000009","10000","17600000011"
};
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("partitioner.class", "com.bonc.rdpe.kafka110.partitioner.PhonenumPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Random rand = new Random();
for (int i = 0; i < 10; i++) {
String phoneNum = PHONE_NUMS[rand.nextInt(PHONE_NUMS.length)];
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum);
RecordMetadata metadata = producer.send(record).get();
System.out.println("phonenum [" + record.value() + "] sent to partition " + metadata.partition());
Thread.sleep(500);
}
producer.close();
}
}Kafka Consumer Fundamentals
Consumers track their position in a partition using an offset. Offsets were stored in ZooKeeper before Kafka 0.8 and are stored in an internal topic __consumer_offsets thereafter. Each consumer group has a coordinator broker that manages heartbeats, detects failures, and triggers rebalances.
Partition assignment strategies include range , round‑robin , and the newer sticky algorithm, which tries to keep existing assignments stable during a rebalance.
Consumer Configuration Parameters
heartbeat.interval.ms– frequency of heartbeat messages. session.timeout.ms – time without heartbeat before the broker marks the consumer dead. max.poll.interval.ms – maximum time between two poll calls. fetch.max.bytes – maximum bytes fetched per request. max.poll.records – maximum number of records returned in a single poll. connection.max.idle.ms – idle time before closing the socket (set to -1 to keep alive). auto.offset.reset – behavior when no offset is stored (earliest, latest, none). enable.auto.commit and auto.commit.interval.ms – automatic offset committing.
Consumer Code Example
public class ConsumerDemo {
private static ExecutorService threadPool = Executors.newFixedThreadPool(20);
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> consumer = createConsumer();
consumer.subscribe(Arrays.asList("order-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
JSONObject order = JSONObject.parseObject(record.value());
threadPool.submit(new CreditManageTask(order));
}
}
} catch (Exception e) {
e.printStackTrace();
consumer.close();
}
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("heartbeat.interval.ms", 1000);
props.put("session.timeout.ms", 10000);
props.put("max.poll.interval.ms", 30000);
props.put("fetch.max.bytes", 10485760);
props.put("max.poll.records", 500);
props.put("connection.max.idle.ms", -1);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
return new KafkaConsumer<>(props);
}
static class CreditManageTask implements Runnable {
private JSONObject order;
CreditManageTask(JSONObject order) { this.order = order; }
@Override
public void run() {
System.out.println("Processing order: " + order.toJSONString());
}
}
}Log Indexing and ISR
Each partition stores a .log file together with a sparse .index (offset) and .timeindex (timestamp) file. The index interval (default 4 KB) determines how often an entry is written, enabling binary search (O(log N)) to locate a record quickly.
In‑sync replicas (ISR) are the followers that have fully caught up with the leader. Only ISR members can be elected leader after a failure, guaranteeing that committed data is not lost.
—
Source: https://blog.csdn.net/qq_33440092/article/details/103864064
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITFLY8 Architecture Home
ITFLY8 Architecture Home - focused on architecture knowledge sharing and exchange, covering project management and product design. Includes large-scale distributed website architecture (high performance, high availability, caching, message queues...), design patterns, architecture patterns, big data, project management (SCRUM, PMP, Prince2), product design, and more.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
