Understanding Flink StreamPartitioner and Its Implementations
Flink’s StreamPartitioner abstracts data routing in DataStream, offering eight built‑in partitioners—including Global, Shuffle, Rebalance, KeyGroup, Broadcast, Rescale, Forward, and Custom—each with distinct channel selection logic, illustrated with source code snippets and explanations of their runtime behavior.
Flink’s StreamPartitioner abstracts the routing of elements in a DataStream, similar to Spark’s RDD partitioning but less explicit. It controls downstream flow via various implementations.
Eight built‑in partitioners are provided:
GlobalPartitioner
// dataStream.global()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}Always sends data to the first downstream instance.
ShufflePartitioner
private Random random = new Random();
// dataStream.shuffle()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}Randomly distributes records uniformly across downstream instances.
RebalancePartitioner
private int nextChannelToSendTo;
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
// dataStream.rebalance()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}Starts from a random downstream instance then round‑robin distributes records, ensuring balanced load.
KeyGroupStreamPartitioner
private final KeySelector<T, K> keySelector;
private int maxParallelism;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}Used by the keyBy() operator; it hashes the key twice (Java hashCode() and MurmurHash) and maps the result to a parallel instance.
BroadcastPartitioner
// dataStream.broadcast()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}
@Override
public boolean isBroadcast() {
return true;
}Specialized for broadcast streams; all downstream instances receive every record, so channel selection is not needed.
RescalePartitioner
private int nextChannelToSendTo = -1;
// dataStream.rescale()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}Uses a pointwise connection pattern; data is sent to a subset of downstream instances based on parallelism ratios, improving locality.
ForwardPartitioner
// dataStream.forward()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}Like GlobalPartitioner, it forwards data to the first local downstream instance; used when upstream and downstream parallelism are equal.
CustomPartitionerWrapper
Partitioner<K> partitioner;
KeySelector<T, K> keySelector;
// dataStream.partitionCustom()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}
return partitioner.partition(key, numberOfChannels);
}Allows users to define their own partitioning logic by implementing the Partitioner interface; example shown partitions by key length.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
