Understanding Flink's Memory Management and Data Flow Architecture
This article explains how Flink manages memory through its MemorySegment abstraction, the implementations of HeapMemorySegment and HybridMemorySegment, the role of ByteBuffer, NetworkBufferPool and LocalBufferPool, and details the end‑to‑end data flow from RecordWriter to Netty transport, including key code snippets.
Flink, as an efficient stream processing framework, avoids JVM limitations (low object density, GC impact) by managing memory autonomously, introducing the MemorySegment abstraction which can represent a 32 KB block backed by a byte[] or off‑heap DirectByteBuffer.
Two concrete implementations exist: HeapMemorySegment (heap‑only) and HybridMemorySegment (both heap and off‑heap). The hybrid design allows a single JVM‑wide implementation, enabling the JIT to de‑virtualize the methods and improve performance.
public abstract class MemorySegment {
public abstract byte get(int index);
public abstract void put(int index, byte b);
public int size();
public abstract ByteBuffer wrap(int offset, int length);
...
}The constructor of HybridMemorySegment shows how it obtains the address of a direct buffer for off‑heap operations:
HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}
HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}On top of MemorySegment, Flink uses the Buffer interface (different from
java.nio.Buffer</>) to transfer data between operators and TaskManagers. Buffers are wrapped in <code>NetworkBuffer, which adds reference counting for safe reclamation.
Flink provides a NetworkBufferPool (factory pattern) that creates a single pool per TaskManager, allocating a configurable number of memory segments (usually direct buffers). The pool distributes segments to multiple LocalBufferPool instances, each serving a sub‑task.
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
}
LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb, availableMemorySegments.size(), segmentSize);
}The LocalBufferPool maintains fields such as numberOfRequiredMemorySegments, availableMemorySegments, and maxNumberOfMemorySegments, and adjusts its size via setNumBuffers while respecting minimum requirements.
public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
checkArgument(numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
numberOfRequiredMemorySegments, numBuffers);
currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
returnExcessMemorySegments();
if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
}
}
}Data produced by an operator is handed to a RecordWriter , which selects target channels via a ChannelSelector, serializes the record with a RecordSerializer, and writes the resulting ByteBuffer into a ResultPartition. When buffers become full, the writer may flush them immediately or rely on a background thread that periodically flushes (mini‑batching).
public void emit(T record) throws IOException, InterruptedException {
for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
sendToTarget(record, targetChannel);
}
}
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer) && result.isFullRecord()) {
break;
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}Flushing triggers Netty to send the buffers over the network. Netty’s pipeline fires a user event, and the ChannelHandlerContext writes and flushes a BufferResponse to the downstream RemoteInputChannel. The downstream side decodes the message, reconstructs the original StreamRecord via StreamElementSerializer.deserialize, and feeds it to the operator.
public StreamElement deserialize(DataInputView source) throws IOException {
int tag = source.readByte();
if (tag == TAG_REC_WITH_TIMESTAMP) {
long timestamp = source.readLong();
return new StreamRecord<>(typeSerializer.deserialize(source), timestamp);
} else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
return new StreamRecord<>(typeSerializer.deserialize(source));
} else if (tag == TAG_WATERMARK) {
return new Watermark(source.readLong());
} else {
throw new IOException("Corrupt stream, found tag: " + tag);
}
}Through this layered architecture—MemorySegment → Buffer → NetworkBufferPool → LocalBufferPool → RecordWriter → Netty → RemoteInputChannel → StreamElementDeserializer—Flink achieves efficient, back‑pressured, and fault‑tolerant data transfer across JVMs and machines.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
