Big Data 24 min read

Mastering Kafka: Build Producers, Consumers, and Custom Partitioners

This extensive tutorial walks through Kafka producer and consumer fundamentals, demonstrates how to configure key‑based and custom partitioning, explains offset management, consumer group coordination, and essential configuration parameters, and includes complete Java code examples for real‑world e‑commerce scenarios.

ITFLY8 Architecture Home
ITFLY8 Architecture Home
ITFLY8 Architecture Home
Mastering Kafka: Build Producers, Consumers, and Custom Partitioners

Kafka Producer Example

In an e‑commerce scenario, orders are logged as JSON messages and sent to a Kafka topic (e.g., tellYourDream) using a producer. The producer is created with typical properties such as bootstrap servers, serializers, buffer memory, compression, batch size, linger, retries, and acks.

public class OrderProducer {
    public static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("buffer.memory", 33554432);
        props.put("compression.type", "lz4");
        props.put("batch.size", 32768);
        props.put("linger.ms", 100);
        props.put("retries", 10);
        props.put("retry.backoff.ms", 300);
        props.put("request.required.acks", "1");
        return new KafkaProducer<>(props);
    }
}

A helper method creates a JSON order record, and the main method sends it with a key (e.g., userId) so that all messages for the same user go to the same partition.

public static JSONObject createRecord() {
    JSONObject order = new JSONObject();
    order.put("userId", 12344);
    order.put("amount", 100.0);
    order.put("statement", "pay");
    return order;
}
public static void main(String[] args) throws Exception {
    KafkaProducer<String, String> producer = createProducer();
    JSONObject order = createRecord();
    ProducerRecord<String, String> record = new ProducerRecord<>(
        "tellYourDream", order.getString("userId"), order.toString());
    producer.send(record, (metadata, exception) -> {
        if (exception == null) {
            System.out.println("Message sent successfully");
        }
    });
    Thread.sleep(10000);
    producer.close();
}

If a message fails after retries, a fallback path such as persisting to MySQL or Redis is recommended.

Key‑Based Partitioning

When no key is supplied, Kafka distributes messages round‑robin across partitions. Supplying a key causes Kafka to hash the key and assign the message to a partition based on the hash modulo the number of partitions, ensuring that messages with the same key always land in the same partition.

Using a consistent key (e.g., userId) prevents ordering problems such as processing a refund before the original purchase.

Custom Partitioner

A custom partitioner can direct specific keys to designated partitions. The example PhonenumPartitioner sends calls from two special phone numbers to the last partition and otherwise hashes the first three digits of the phone number.

public class PhonenumPartitioner implements Partitioner {
    @Override
    public void configure(Map<String, ?> configs) { }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (key.toString().equals("10000") || key.toString().equals("11111")) {
            return numPartitions - 1;
        }
        String phoneNum = key.toString();
        return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
    }

    @Override
    public void close() { }
}

A producer that uses this partitioner selects a random phone number and sends it, printing the partition chosen.

public class PartitionerProducer {
    private static final String[] PHONE_NUMS = {
        "10000","10000","11111","13700000003","13700000004",
        "10000","15500000006","11111","15500000008",
        "17600000009","10000","17600000011"
    };
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("partitioner.class", "com.bonc.rdpe.kafka110.partitioner.PhonenumPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        Random rand = new Random();
        for (int i = 0; i < 10; i++) {
            String phoneNum = PHONE_NUMS[rand.nextInt(PHONE_NUMS.length)];
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum);
            RecordMetadata metadata = producer.send(record).get();
            System.out.println("phonenum [" + record.value() + "] sent to partition " + metadata.partition());
            Thread.sleep(500);
        }
        producer.close();
    }
}

Kafka Consumer Fundamentals

Consumers track their position in a partition using an offset. Offsets were stored in ZooKeeper before Kafka 0.8 and are stored in an internal topic __consumer_offsets thereafter. Each consumer group has a coordinator broker that manages heartbeats, detects failures, and triggers rebalances.

Partition assignment strategies include range , round‑robin , and the newer sticky algorithm, which tries to keep existing assignments stable during a rebalance.

Consumer Configuration Parameters

heartbeat.interval.ms

– frequency of heartbeat messages. session.timeout.ms – time without heartbeat before the broker marks the consumer dead. max.poll.interval.ms – maximum time between two poll calls. fetch.max.bytes – maximum bytes fetched per request. max.poll.records – maximum number of records returned in a single poll. connection.max.idle.ms – idle time before closing the socket (set to -1 to keep alive). auto.offset.reset – behavior when no offset is stored (earliest, latest, none). enable.auto.commit and auto.commit.interval.ms – automatic offset committing.

Consumer Code Example

public class ConsumerDemo {
    private static ExecutorService threadPool = Executors.newFixedThreadPool(20);
    public static void main(String[] args) throws Exception {
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(Arrays.asList("order-topic"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records) {
                    JSONObject order = JSONObject.parseObject(record.value());
                    threadPool.submit(new CreditManageTask(order));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            consumer.close();
        }
    }
    private static KafkaConsumer<String, String> createConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("heartbeat.interval.ms", 1000);
        props.put("session.timeout.ms", 10000);
        props.put("max.poll.interval.ms", 30000);
        props.put("fetch.max.bytes", 10485760);
        props.put("max.poll.records", 500);
        props.put("connection.max.idle.ms", -1);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        return new KafkaConsumer<>(props);
    }
    static class CreditManageTask implements Runnable {
        private JSONObject order;
        CreditManageTask(JSONObject order) { this.order = order; }
        @Override
        public void run() {
            System.out.println("Processing order: " + order.toJSONString());
        }
    }
}

Log Indexing and ISR

Each partition stores a .log file together with a sparse .index (offset) and .timeindex (timestamp) file. The index interval (default 4 KB) determines how often an entry is written, enabling binary search (O(log N)) to locate a record quickly.

In‑sync replicas (ISR) are the followers that have fully caught up with the leader. Only ISR members can be elected leader after a failure, guaranteeing that committed data is not lost.

Source: https://blog.csdn.net/qq_33440092/article/details/103864064

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KafkaConsumerProducerPartitioning
ITFLY8 Architecture Home
Written by

ITFLY8 Architecture Home

ITFLY8 Architecture Home - focused on architecture knowledge sharing and exchange, covering project management and product design. Includes large-scale distributed website architecture (high performance, high availability, caching, message queues...), design patterns, architecture patterns, big data, project management (SCRUM, PMP, Prince2), product design, and more.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.