Understanding Flink’s Unified Programming API for Batch and Streaming Jobs
This article examines Apache Flink’s programming model, comparing its batch DataSet API with the streaming DataStream API, detailing class hierarchies, key code examples such as groupBy and job submission, and explaining how both paradigms are unified into a common JobGraph representation.
When developing batch or streaming jobs with Apache Flink, the focus extends beyond the specific processing logic to the underlying framework that unifies the API surface, enabling a deeper understanding of Flink’s architecture.
Flink’s data‑flow programming model is organized in layered design, as illustrated by the diagram below.
Programming API Design
Both batch and streaming jobs start with a Source and end with a Sink, with various operators in between. In the batch API the core abstraction is DataSet, while the streaming API uses DataStream. The following diagram shows the class hierarchy for the batch side.
All related classes reside in the org.apache.flink.api.java.operators package and can be grouped into four categories: DataSource, DataSink, SingleInputOperator, and TwoInputOperator. Although DataSink does not inherit from DataSet, it still represents the output node of a batch DAG.
During batch job development, each transformation creates a new DataSet. For example, the groupBy operation returns an UnsortedGrouping intermediate structure. The relevant code is shown below:
public UnsortedGrouping<T> groupBy(int... fields) {
return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
}The UnsortedGrouping class extends the abstract Grouping class, whose definition is:
@Public
public abstract class Grouping<T> {
protected final DataSet<T> inputDataSet;
protected final Keys<T> keys;
protected Partitioner<?> customPartitioner;
public Grouping(DataSet<T> set, Keys<T> keys) {
if (set == null || keys == null) {
throw new NullPointerException();
}
if (keys.isEmpty()) {
throw new InvalidProgramException("The grouping keys must not be empty.");
}
this.inputDataSet = set;
this.keys = keys;
}
@Internal
public DataSet<T> getInputDataSet() {
return this.inputDataSet;
}
@Internal
public Keys<T> getKeys() {
return this.keys;
}
@Internal
public Partitioner<?> getCustomPartitioner() {
return this.customPartitioner;
}
}On the streaming side, Flink uses DataStream and StreamOperator. The following diagram shows the DataStream hierarchy.
Each transformation in a streaming DAG produces a new DataStream that internally wraps a StreamOperator. Examples include DataStreamSource (from readTextFile()), KeyedStream (from keyBy()), SplitStream, and IterativeStream. The StreamOperator hierarchy is illustrated below.
Job submission uses ExecutionEnvironment for batch and StreamExecutionEnvironment for streaming. After constructing the DAG, both call execute(). The common and differing steps are captured in the following flow diagram.
Batch job submission core code:
final Plan plan = createProgramPlan(jobName);
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(plan, configuration);Streaming job submission core code:
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);Both submission paths eventually produce a JobGraph. The conversion from a Pipeline (either a Plan for batch or a StreamGraph for streaming) to a JobGraph is performed by a specific FlinkPipelineTranslator (either PlanTranslator or StreamGraphTranslator), as shown in the following method:
public static JobGraph getJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism) {
FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
return pipelineTranslator.translateToJobGraph(pipeline,
optimizerConfiguration,
defaultParallelism);
}This unified translation explains why, despite different APIs, both batch and streaming jobs share the same underlying execution graph.
·END·
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
