Spark Streaming vs Flink – Architecture, Scheduling & Fault Tolerance
This article compares Spark Streaming and Flink across runtime models, component roles, programming APIs, task scheduling, time semantics, dynamic Kafka partition detection, fault‑tolerance mechanisms, exactly‑once guarantees, and back‑pressure handling, providing code examples and practical insights for real‑time data processing.
Framework Roles
In Spark Streaming (standalone mode) the runtime roles are Master (cluster resource manager), Worker (node‑level resource manager), Driver (DAG generation and task scheduling) and Executor (task execution). In Flink the roles are JobManager (coordinates distributed execution, checkpoints, and recovery), TaskManager (runs operators), and Slot (a fixed portion of a TaskManager’s resources).
Runtime Model
Spark Streaming processes data as micro‑batches; each batch triggers a Spark Core job. The batch interval is configured when the StreamingContext is created.
Flink processes events directly; each incoming record is treated as an event, enabling stateful stream processing with event‑time semantics.
Programming Model Comparison
Spark Streaming integrates with Kafka via two models: receiver DStream and direct DStream . The direct model is now the recommended approach.
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()Flink’s Kafka connector is event‑driven. The source is created with FlinkKafkaConsumer010, watermarks are assigned, and the sink uses FlinkKafkaProducer010.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<KafkaEvent> input = env.addSource(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()))
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor())
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.10 Example");Task Scheduling
Spark builds a DAG, splits it into stages, creates a task set, and schedules tasks for each micro‑batch.
Flink first generates a StreamGraph, converts it to a JobGraph, and the JobManager turns the JobGraph into an ExecutionGraph before dispatching tasks.
Time Mechanism Comparison
Three time concepts are used in stream processing:
Processing time : the system clock of the machine executing the operator.
Event time : the timestamp embedded in the event when it was produced; requires watermarks for out‑of‑order handling.
Ingestion time : the time when the event enters Flink.
Spark Streaming supports only processing time. Structured Streaming adds event time and watermark support. Flink supports all three and provides built‑in watermark handling.
Dynamic Kafka Partition Detection
Spark 0.8’s DirectKafkaInputDStream does not track new partitions; the compute method only uses the existing currentOffsets. Therefore, Spark 0.8 cannot discover added partitions.
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
// build KafkaRDD with currentOffsets only – no new‑partition logic
}Spark 0.10 introduces latestOffsets(), which queries the consumer’s assignment, detects new partitions, and updates currentOffsets. Enabling this version allows dynamic partition discovery.
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
paranoidPoll(c)
val parts = c.assignment().asScala
val newPartitions = parts.diff(currentOffsets.keySet)
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp))
c.pause(newPartitions.asJava)
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}Flink’s FlinkKafkaConsumerBase creates a discovery thread that periodically calls partitionDiscoverer.discoverPartitions(). New partitions are added to the fetcher when discoveryIntervalMillis is set to a value greater than zero.
if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
discoveryLoopThread = new Thread(() -> {
while (running) {
List<KafkaTopicPartition> discovered = partitionDiscoverer.discoverPartitions();
if (!discovered.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discovered);
}
Thread.sleep(discoveryIntervalMillis);
}
}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
discoveryLoopThread.start();
}Fault Tolerance & Exactly‑once Semantics
Spark Streaming achieves at‑least‑once guarantees via checkpointing. Exactly‑once requires committing results and offsets atomically, e.g., using a single‑partition transaction or a transactional sink.
Dstream.foreachRDD(rdd => {
rdd.repartition(1).foreachPartition(partition => {
// begin transaction
partition.foreach(record => {
// write result
})
// commit transaction
})
})Flink uses a two‑phase commit protocol. During a checkpoint, a barrier is injected, operators snapshot state, and external sinks (e.g., Kafka) perform a pre‑commit. After all operators acknowledge, the commit phase finalises the external transaction.
Figures 13‑15 in the original article illustrate the pre‑commit and commit steps.
Backpressure
Spark Streaming employs a PID‑based RateController. The controller receives processing delay, scheduling delay, and record count, then computes a new ingestion rate.
def compute(time: Long, numElements: Long, processingDelay: Long, schedulingDelay: Long): Option[Double] = {
if (time > latestTime && numElements > 0 && processingDelay > 0) {
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate
val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate)
latestRate = newRate
Some(newRate)
} else None
}Flink’s backpressure monitors the ratio of blocked threads by periodically sampling stack traces (≈100 times per 50 ms). The ratio is classified as OK (0‑0.10), LOW (0.10‑0.5), or HIGH (0.5‑1.0) in the Web UI.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Meitu Technology
Curating Meitu's technical expertise, valuable case studies, and innovation insights. We deliver quality technical content to foster knowledge sharing between Meitu's tech team and outstanding developers worldwide.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
