Big Data 15 min read

Implementing Multi‑threaded Kafka Consumer and Producer with Partition Management

This article explains how to build a multi‑threaded Kafka consumer and producer in Java, covering partition concepts, consumer group offsets, thread‑pool configuration, and code examples that demonstrate proper use of Kafka streams, partition keys, and batch message sending for improved throughput.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Implementing Multi‑threaded Kafka Consumer and Producer with Partition Management

Kafka stores messages in topics that are divided into multiple partitions; each partition maintains its own offset and can be consumed by only one thread at a time, while a single thread may consume from several partitions. Consumer groups (identified by group.id) keep separate offsets for each partition.

The classic high‑level consumer API shown below creates a single consumer thread, which is unsuitable for production when parallel consumption is required.

Properties props = new Properties();
props.put("zookeeper.connect", "xxxx:2181");
props.put("zookeeper.connectiontimeout.ms", "1000000");
props.put("group.id", "test_group");
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext()) {
    String msg = new String(it.next().message(), "utf-8").trim();
    System.out.println("receive:" + msg);
}

To enable true parallel consumption, the number of threads must be greater than one and should match the number of partitions you intend to read. The refactored consumer class creates a fixed‑size thread pool and assigns a KafkaConsumerThread to each stream.

public class KafakConsumer implements Runnable {
    private ConsumerConfig consumerConfig;
    private static String topic = "blog";
    private Properties props;
    final int a_numThreads = 6;

    public KafakConsumer() {
        props = new Properties();
        props.put("zookeeper.connect", "xxx:2181,yyy:2181,zzz:2181");
        props.put("group.id", "blog");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        consumerConfig = new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
        for (final KafkaStream stream : streams) {
            executor.submit(new KafkaConsumerThread(stream));
        }
    }

    public static void main(String[] args) {
        Thread t = new Thread(new KafakConsumer());
        t.start();
    }
}

The worker thread simply iterates over its assigned stream and prints the partition, offset and message.

public class KafkaConsumerThread implements Runnable {
    private KafkaStream<byte[], byte[]> stream;
    public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
        this.stream = stream;
    }
    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> mam = it.next();
            System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "]," +
                               " offset[" + mam.offset() + "], " + new String(mam.message()));
        }
    }
}

The producer example below demonstrates batch sending of messages to specific partitions using KeyedMessage. It creates a fixed thread pool of six threads, each sending six messages to six distinct partitions.

public class KafkaProducer implements Runnable {
    private Producer<String, String> producer = null;
    private ProducerConfig config = null;
    public KafkaProducer() {
        Properties props = new Properties();
        props.put("zookeeper.connect", "*****:2181,****:2181,****:2181");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("producer.type", "sync");
        props.put("compression.codec", "1");
        props.put("broker.list", "****:6667,***:6667,****:6667");
        config = new ProducerConfig(props);
    }
    @Override
    public void run() {
        producer = new Producer<String, String>(config);
        for (int i = 1; i <= 6; i++) { // send to 6 partitions
            List<KeyedMessage<String, String>> messageList = new ArrayList<>();
            for (int j = 0; j < 6; j++) { // 6 messages per partition
                messageList.add(new KeyedMessage<String, String>("blog", "partition[" + i + "]", "message[The " + i + " message]"));
            }
            producer.send(messageList);
        }
    }
    public static void main(String[] args) {
        Thread t = new Thread(new KafkaProducer());
        t.start();
    }
}

The send(List<KeyedMessage<K,V>> messages) method internally converts the Java list to a Scala sequence before invoking the underlying producer, enabling efficient batch transmission.

public void send(List<KeyedMessage<K, V>> messages) {
    underlying().send(JavaConversions..MODULE$.asScalaBuffer(messages).toSeq());
}

Running the consumer and producer together shows that each partition is consumed by exactly one thread, while a thread may handle multiple partitions. If the thread‑pool size is smaller than the number of partitions, some partitions remain unconsumed, leading to rebalancing issues. Therefore, the recommended practice is to set the number of partitions equal to the number of consumer threads (i.e., topicCountMap value) to achieve optimal parallelism.

In summary, achieving multi‑threaded consumption in Kafka relies on proper partition planning, matching consumer thread count to partition count, and using batch‑send APIs on the producer side to fully utilize the parallel capabilities of the platform.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataKafkamultithreadingConsumerProducerPartition
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.