Big Data 8 min read

Understanding Flink StreamPartitioner and Its Implementations

Flink’s StreamPartitioner abstracts data routing in DataStream, offering eight built‑in partitioners—including Global, Shuffle, Rebalance, KeyGroup, Broadcast, Rescale, Forward, and Custom—each with distinct channel selection logic, illustrated with source code snippets and explanations of their runtime behavior.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Flink StreamPartitioner and Its Implementations

Flink’s StreamPartitioner abstracts the routing of elements in a DataStream, similar to Spark’s RDD partitioning but less explicit. It controls downstream flow via various implementations.

Eight built‑in partitioners are provided:

GlobalPartitioner

// dataStream.global()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return 0;
}

Always sends data to the first downstream instance.

ShufflePartitioner

private Random random = new Random();
// dataStream.shuffle()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return random.nextInt(numberOfChannels);
}

Randomly distributes records uniformly across downstream instances.

RebalancePartitioner

private int nextChannelToSendTo;
@Override
public void setup(int numberOfChannels) {
    super.setup(numberOfChannels);
    nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
// dataStream.rebalance()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
    return nextChannelToSendTo;
}

Starts from a random downstream instance then round‑robin distributes records, ensuring balanced load.

KeyGroupStreamPartitioner

private final KeySelector<T, K> keySelector;
private int maxParallelism;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    K key;
    try {
        key = keySelector.getKey(record.getInstance().getValue());
    } catch (Exception e) {
        throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
    }
    return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}

public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}

public static int assignToKeyGroup(Object key, int maxParallelism) {
    return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}

public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
    return MathUtils.murmurHash(keyHash) % maxParallelism;
}

public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
    return keyGroupId * parallelism / maxParallelism;
}

Used by the keyBy() operator; it hashes the key twice (Java hashCode() and MurmurHash) and maps the result to a parallel instance.

BroadcastPartitioner

// dataStream.broadcast()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}
@Override
public boolean isBroadcast() {
    return true;
}

Specialized for broadcast streams; all downstream instances receive every record, so channel selection is not needed.

RescalePartitioner

private int nextChannelToSendTo = -1;
// dataStream.rescale()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    if (++nextChannelToSendTo >= numberOfChannels) {
        nextChannelToSendTo = 0;
    }
    return nextChannelToSendTo;
}

Uses a pointwise connection pattern; data is sent to a subset of downstream instances based on parallelism ratios, improving locality.

ForwardPartitioner

// dataStream.forward()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return 0;
}

Like GlobalPartitioner, it forwards data to the first local downstream instance; used when upstream and downstream parallelism are equal.

CustomPartitionerWrapper

Partitioner<K> partitioner;
KeySelector<T, K> keySelector;
// dataStream.partitionCustom()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    K key;
    try {
        key = keySelector.getKey(record.getInstance().getValue());
    } catch (Exception e) {
        throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
    }
    return partitioner.partition(key, numberOfChannels);
}

Allows users to define their own partitioning logic by implementing the Partitioner interface; example shown partitions by key length.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkPartitioningDataStreamStreamPartitioner
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.