Understanding Kafka Partition Assignment: Strategies and Code Walkthrough

This article explains how Kafka determines which partition a producer sends a record to, how partition counts are configured, and how consumer groups assign partitions using the default, range, and round‑robin strategies, complemented by detailed Java code examples.

Programmer DD
Programmer DD
Programmer DD
Understanding Kafka Partition Assignment: Strategies and Code Walkthrough

1. Introduction

Producers send messages to topics, consumers subscribe to topics as part of a consumer group, and topics contain partitions where messages are stored. The article explores how a producer decides which partition to write to and how partitions are allocated among consumer instances.

2. Configuring Topic Partition Count

The global default partition count can be set in server.properties, defaulting to 1. Individual topics can override this when they are created.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1

When creating a topic, the --partitions option specifies the number of partitions.

[root@localhost kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc<br/>Topic:abc       PartitionCount:2        ReplicationFactor:1      Configs:<br/>        Topic: abc  Partition: 0    Leader: 0       Replicas: 0     Isr: 0<br/>        Topic: abc  Partition: 1    Leader: 0       Replicas: 0     Isr: 0

3. Producer Partitioning

The default partitioning strategy is:

The default partitioning strategy: If a partition is specified in the record, use it. If no partition is specified but a key is present, choose a partition based on the key’s hash. If neither partition nor key is present, choose a partition in a round‑robin fashion.

The implementation class is org.apache.kafka.clients.producer.internals.DefaultPartitioner .

/**<br/> * Compute the partition for the given record.<br/> * @param topic The topic name<br/> * @param key The key to partition on (or null)<br/> * @param keyBytes serialized key (or null)<br/> * @param value The value to partition on (or null)<br/> * @param valueBytes serialized value (or null)<br/> * @param cluster The current cluster metadata<br/> */<br/>public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {<br/>    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);<br/>    int numPartitions = partitions.size();<br/>    if (keyBytes == null) {<br/>        int nextValue = nextValue(topic);<br/>        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);<br/>        if (availablePartitions.size() > 0) {<br/>            int part = Utils.toPositive(nextValue) % availablePartitions.size();<br/>            return availablePartitions.get(part).partition();<br/>        } else {<br/>            return Utils.toPositive(nextValue) % numPartitions;<br/>        }<br/>    } else {<br/>        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;<br/>    }<br/>}

4. Consumer Partition Assignment

Consumers in a group subscribe to a topic; each partition is consumed by only one consumer instance in the group. If the number of partitions is less than the number of consumers, some consumers remain idle under the default strategy.

4.1. Assignment Strategies

The abstract class org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor has three built‑in implementations.

4.1.1. range

The range assignor ( org.apache.kafka.clients.consumer.RangeAssignor ) is the default. It sorts partitions numerically and consumers lexicographically, then divides partitions evenly; extra partitions go to the first consumers.

/**<br/> * The range assignor works on a per‑topic basis. For each topic, we lay out the available partitions in numeric order<br/> * and the consumers in lexicographic order. We then divide the number of partitions by the total number of<br/> * consumers to determine the number of partitions to assign to each consumer. If it does not evenly<br/> * divide, then the first few consumers will have one extra partition.<br/> *<br/> * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,<br/> * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.<br/> * The assignment will be:<br/> * C0: [t0p0, t0p1, t1p0, t1p1]<br/> * C1: [t0p2, t1p2]<br/> */

Resulting assignment example:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

4.1.2. roundrobin

The round‑robin assignor ( org.apache.kafka.clients.consumer.RoundRobinAssignor ) lays out all available partitions and consumers, then assigns them in a round‑robin manner. When subscriptions differ, some consumers may receive fewer partitions.

/**<br/> * The round robin assignor lays out all the available partitions and all the available consumers. It<br/> * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer<br/> * instances are identical, the partitions will be uniformly distributed.<br/> *<br/> * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, each with 3 partitions,<br/> * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.<br/> * The assignment will be:<br/> * C0: [t0p0, t0p2, t1p1]<br/> * C1: [t0p1, t1p0, t1p2]<br/> */

When consumers have different subscriptions, the assignor still iterates round‑robin but skips consumers that are not subscribed to a given topic, leading to potentially unbalanced assignments.

5. Test Code

Example Maven pom.xml for a Spring Boot Kafka demo:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"<br/>    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><br/>    <modelVersion>4.0.0</modelVersion><br/>    <groupId>com.cjs.example</groupId><br/>    <artifactId>kafka-demo</artifactId><br/>    <version>0.0.1-SNAPSHOT</version><br/>    <packaging>jar</packaging><br/>    <name>kafka-demo</name><br/>    <parent><br/>        <groupId>org.springframework.boot</groupId><br/>        <artifactId>spring-boot-starter-parent</artifactId><br/>        <version>2.0.5.RELEASE</version><br/>    </parent><br/>    <properties><br/>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><br/>        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><br/>        <java.version>1.8</java.version><br/>    </properties><br/>    <dependencies><br/>        <dependency><br/>            <groupId>org.springframework.boot</groupId><br/>            <artifactId>spring-boot-starter-web</artifactId><br/>        </dependency><br/>        <dependency><br/>            <groupId>org.springframework.kafka</groupId><br/>            <artifactId>spring-kafka</artifactId><br/>        </dependency><br/>        <dependency><br/>            <groupId>org.springframework.boot</groupId><br/>            <artifactId>spring-boot-starter-test</artifactId><br/>            <scope>test</scope><br/>        </dependency><br/>    </dependencies><br/>    <build><br/>        <plugins><br/>            <plugin><br/>                <groupId>org.springframework.boot</groupId><br/>                <artifactId>spring-boot-maven-plugin</artifactId><br/>            </plugin><br/>        </plugins><br/>    </build><br/></project>

Producer example:

package com.cjs.kafka.producer;<br/><br/>import org.apache.kafka.clients.producer.*;<br/>import java.util.Properties;<br/><br/>public class HelloProducer {<br/>    public static void main(String[] args) {<br/>        Properties props = new Properties();<br/>        props.put("bootstrap.servers", "192.168.1.133:9092");<br/>        props.put("acks", "all");<br/>        props.put("retries", 0);<br/>        props.put("batch.size", 16384);<br/>        props.put("linger.ms", 1);<br/>        props.put("buffer.memory", 33554432);<br/>        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");<br/>        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");<br/>        Producer<String, String> producer = new KafkaProducer<>(props);<br/>        for (int i = 0; i < 100; i++) {<br/>            producer.send(new ProducerRecord<>("abc", Integer.toString(i), Integer.toString(i)), new Callback() {<br/>                @Override<br/>                public void onCompletion(RecordMetadata recordMetadata, Exception e) {<br/>                    if (e != null) {<br/>                        e.printStackTrace();<br/>                    } else {<br/>                        System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());<br/>                    }<br/>                }<br/>            });<br/>        }<br/>        producer.close();<br/>    }<br/>}

Consumer example:

package com.cjs.kafka.consumer;<br/><br/>import org.apache.kafka.clients.consumer.*;<br/>import java.util.*;<br/><br/>public class HelloConsumer {<br/>    public static void main(String[] args) {<br/>        Properties props = new Properties();<br/>        props.put("bootstrap.servers", "192.168.1.133:9092");<br/>        props.put("group.id", "test");<br/>        props.put("enable.auto.commit", "true");<br/>        props.put("auto.commit.interval.ms", "1000");<br/>        // props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");<br/>        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");<br/>        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");<br/>        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);<br/>        consumer.subscribe(Arrays.asList("foo", "bar", "abc"));<br/>        while (true) {<br/>            ConsumerRecords<String, String> records = consumer.poll(100);<br/>            for (ConsumerRecord<String, String> record : records) {<br/>                System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());<br/>            }<br/>        }<br/>    }<br/>}

6. References

Kafka consumer configuration documentation and several Chinese blog posts covering partitioning and assignment strategies.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaKafkaProducerRoundRobinconsumer groupsPartition AssignmentRange Assignor
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.