Understanding Kafka Partition Assignment: Strategies and Code Walkthrough
This article explains how Kafka determines which partition a producer sends a record to, how partition counts are configured, and how consumer groups assign partitions using the default, range, and round‑robin strategies, complemented by detailed Java code examples.
1. Introduction
Producers send messages to topics, consumers subscribe to topics as part of a consumer group, and topics contain partitions where messages are stored. The article explores how a producer decides which partition to write to and how partitions are allocated among consumer instances.
2. Configuring Topic Partition Count
The global default partition count can be set in server.properties, defaulting to 1. Individual topics can override this when they are created.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1When creating a topic, the --partitions option specifies the number of partitions.
[root@localhost kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc<br/>Topic:abc PartitionCount:2 ReplicationFactor:1 Configs:<br/> Topic: abc Partition: 0 Leader: 0 Replicas: 0 Isr: 0<br/> Topic: abc Partition: 1 Leader: 0 Replicas: 0 Isr: 03. Producer Partitioning
The default partitioning strategy is:
The default partitioning strategy: If a partition is specified in the record, use it. If no partition is specified but a key is present, choose a partition based on the key’s hash. If neither partition nor key is present, choose a partition in a round‑robin fashion.
The implementation class is org.apache.kafka.clients.producer.internals.DefaultPartitioner .
/**<br/> * Compute the partition for the given record.<br/> * @param topic The topic name<br/> * @param key The key to partition on (or null)<br/> * @param keyBytes serialized key (or null)<br/> * @param value The value to partition on (or null)<br/> * @param valueBytes serialized value (or null)<br/> * @param cluster The current cluster metadata<br/> */<br/>public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {<br/> List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);<br/> int numPartitions = partitions.size();<br/> if (keyBytes == null) {<br/> int nextValue = nextValue(topic);<br/> List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);<br/> if (availablePartitions.size() > 0) {<br/> int part = Utils.toPositive(nextValue) % availablePartitions.size();<br/> return availablePartitions.get(part).partition();<br/> } else {<br/> return Utils.toPositive(nextValue) % numPartitions;<br/> }<br/> } else {<br/> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;<br/> }<br/>}4. Consumer Partition Assignment
Consumers in a group subscribe to a topic; each partition is consumed by only one consumer instance in the group. If the number of partitions is less than the number of consumers, some consumers remain idle under the default strategy.
4.1. Assignment Strategies
The abstract class org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor has three built‑in implementations.
4.1.1. range
The range assignor ( org.apache.kafka.clients.consumer.RangeAssignor ) is the default. It sorts partitions numerically and consumers lexicographically, then divides partitions evenly; extra partitions go to the first consumers.
/**<br/> * The range assignor works on a per‑topic basis. For each topic, we lay out the available partitions in numeric order<br/> * and the consumers in lexicographic order. We then divide the number of partitions by the total number of<br/> * consumers to determine the number of partitions to assign to each consumer. If it does not evenly<br/> * divide, then the first few consumers will have one extra partition.<br/> *<br/> * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,<br/> * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.<br/> * The assignment will be:<br/> * C0: [t0p0, t0p1, t1p0, t1p1]<br/> * C1: [t0p2, t1p2]<br/> */Resulting assignment example:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
4.1.2. roundrobin
The round‑robin assignor ( org.apache.kafka.clients.consumer.RoundRobinAssignor ) lays out all available partitions and consumers, then assigns them in a round‑robin manner. When subscriptions differ, some consumers may receive fewer partitions.
/**<br/> * The round robin assignor lays out all the available partitions and all the available consumers. It<br/> * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer<br/> * instances are identical, the partitions will be uniformly distributed.<br/> *<br/> * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, each with 3 partitions,<br/> * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.<br/> * The assignment will be:<br/> * C0: [t0p0, t0p2, t1p1]<br/> * C1: [t0p1, t1p0, t1p2]<br/> */When consumers have different subscriptions, the assignor still iterates round‑robin but skips consumers that are not subscribed to a given topic, leading to potentially unbalanced assignments.
5. Test Code
Example Maven pom.xml for a Spring Boot Kafka demo:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"<br/> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><br/> <modelVersion>4.0.0</modelVersion><br/> <groupId>com.cjs.example</groupId><br/> <artifactId>kafka-demo</artifactId><br/> <version>0.0.1-SNAPSHOT</version><br/> <packaging>jar</packaging><br/> <name>kafka-demo</name><br/> <parent><br/> <groupId>org.springframework.boot</groupId><br/> <artifactId>spring-boot-starter-parent</artifactId><br/> <version>2.0.5.RELEASE</version><br/> </parent><br/> <properties><br/> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><br/> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><br/> <java.version>1.8</java.version><br/> </properties><br/> <dependencies><br/> <dependency><br/> <groupId>org.springframework.boot</groupId><br/> <artifactId>spring-boot-starter-web</artifactId><br/> </dependency><br/> <dependency><br/> <groupId>org.springframework.kafka</groupId><br/> <artifactId>spring-kafka</artifactId><br/> </dependency><br/> <dependency><br/> <groupId>org.springframework.boot</groupId><br/> <artifactId>spring-boot-starter-test</artifactId><br/> <scope>test</scope><br/> </dependency><br/> </dependencies><br/> <build><br/> <plugins><br/> <plugin><br/> <groupId>org.springframework.boot</groupId><br/> <artifactId>spring-boot-maven-plugin</artifactId><br/> </plugin><br/> </plugins><br/> </build><br/></project>Producer example:
package com.cjs.kafka.producer;<br/><br/>import org.apache.kafka.clients.producer.*;<br/>import java.util.Properties;<br/><br/>public class HelloProducer {<br/> public static void main(String[] args) {<br/> Properties props = new Properties();<br/> props.put("bootstrap.servers", "192.168.1.133:9092");<br/> props.put("acks", "all");<br/> props.put("retries", 0);<br/> props.put("batch.size", 16384);<br/> props.put("linger.ms", 1);<br/> props.put("buffer.memory", 33554432);<br/> props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");<br/> props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");<br/> Producer<String, String> producer = new KafkaProducer<>(props);<br/> for (int i = 0; i < 100; i++) {<br/> producer.send(new ProducerRecord<>("abc", Integer.toString(i), Integer.toString(i)), new Callback() {<br/> @Override<br/> public void onCompletion(RecordMetadata recordMetadata, Exception e) {<br/> if (e != null) {<br/> e.printStackTrace();<br/> } else {<br/> System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());<br/> }<br/> }<br/> });<br/> }<br/> producer.close();<br/> }<br/>}Consumer example:
package com.cjs.kafka.consumer;<br/><br/>import org.apache.kafka.clients.consumer.*;<br/>import java.util.*;<br/><br/>public class HelloConsumer {<br/> public static void main(String[] args) {<br/> Properties props = new Properties();<br/> props.put("bootstrap.servers", "192.168.1.133:9092");<br/> props.put("group.id", "test");<br/> props.put("enable.auto.commit", "true");<br/> props.put("auto.commit.interval.ms", "1000");<br/> // props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");<br/> props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");<br/> props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");<br/> KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);<br/> consumer.subscribe(Arrays.asList("foo", "bar", "abc"));<br/> while (true) {<br/> ConsumerRecords<String, String> records = consumer.poll(100);<br/> for (ConsumerRecord<String, String> record : records) {<br/> System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());<br/> }<br/> }<br/> }<br/>}6. References
Kafka consumer configuration documentation and several Chinese blog posts covering partitioning and assignment strategies.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
