How Flink Powers Unified Stream‑Batch Processing at Scale: Production Lessons
This article explains why Flink was chosen as a unified stream‑batch engine, details the migration from Lambda architecture, outlines the Flink Batch production workflow, and shares key optimizations such as Hive dialect support, CTAS, adaptive scheduling, speculative execution, and future roadmap for large‑scale data processing.
1. Motivation: From Lambda Architecture to a Unified Engine
The traditional Lambda architecture separates real‑time and batch pipelines, leading to duplicated storage, codebases, and operational overhead. Unifying the engine to a single platform (Apache Flink) enables developers to learn one API, reuse code, and maintain consistent data quality across streaming and batch workloads.
2. Flink as the Stream‑Batch Engine
Flink was selected after evaluating major big‑data engines because it provides a native stream‑batch design, an active community, and mature batch capabilities (available since Flink 1.12). This choice allows seamless migration of existing Hive/Spark batch jobs to Flink without rewriting business logic.
3. Production Workflow for Flink Batch Jobs at Kuaishou
More than 3,000 Flink Batch jobs run in production, primarily using Batch SQL with the Hive dialect. The end‑to‑end onboarding pipeline consists of:
Identify candidate Batch SQL jobs (typically low‑priority, simple data‑processing tasks).
Parse and validate the SQL with Flink’s HiveParser; ensure the job uses HiveDialect and HiveCatalog.
Rewrite the SQL so that target tables point to a test database (using CREATE TABLE LIKE or modifying existing DDL).
Submit the rewritten job via SQL‑Client (auto‑completion disabled for file‑based scripts) and run it as a shadow job.
Compare shadow‑job results and YARN resource consumption with the original online job. Validation includes column‑wise sum/hash checks, data volume, and partition consistency.
When validation passes, promote the job to the production Flink engine and continue monitoring data quality.
Key integration points:
HiveServer – unified entry point for Batch SQL.
BeaconServer – performs SQL rewrite and routes queries to the Flink engine.
SQL‑Client – job submission tool; file‑based scripts require disabling auto‑completion.
4. Core Engine Optimizations
The following engineering enhancements were added to make Flink Batch production‑ready.
4.1 Hive Dialect Compatibility
Supported CREATE TABLE AS SELECT (CTAS), USING JAR, ADD JAR, macro commands, and other Hive‑specific syntaxes.
Ensured that the parser uses HiveDialect and that the active catalog is a HiveCatalog, otherwise Flink falls back to its own parser.
4.2 Dynamic UDF Loading
Implemented JAR loading via Flink’s BlobServer and a MutableURLClassLoader so that Java/Scala UDFs and remote JARs can be registered at runtime.
4.3 CTAS Execution
The engine infers the target table schema from the SELECT query, creates the table through a catalog hook, and drops it on failure to guarantee atomicity.
4.4 Dynamic Partition Write Optimization
Added a job option to disable the default Sort node in dynamic partition writes, reducing execution time.
4.5 Small‑File Merging
Introduced CompactorCoordinator and Rewriter components that detect small Hive files, merge them into larger files, and commit partitions, lowering NameNode pressure and improving read performance.
4.6 Aggregation Optimizations
Re‑implemented common Hive UDAFs using DeclarativeAggregateFunction to enable HashAgg instead of the default SortAgg, achieving performance comparable to built‑in functions.
4.7 Adaptive Scheduler
Automatically derives per‑operator parallelism from upstream data volume, eliminating manual parallelism tuning for thousands of batch jobs.
Supports fine‑grained parallelism for sources, sinks, and intermediate operators.
4.8 Speculative (Speculative) Execution
Detects slow Task instances, launches shadow tasks on other nodes, and keeps the first successful result, improving stability under resource contention.
4.9 Remote Shuffle Service
Adopted a self‑developed Remote Shuffle Service that provides push‑based shuffle, consolidates shuffle partitions, and offers end‑to‑end data‑consistency checks.
5. Detailed Mechanisms
5.1 Classloader & BlobServer Integration
When a job registers a UDF via USING JAR or ADD JAR, the JAR is validated, added to the ResourceManager, and loaded into the MutableURLClassLoader. During job submission, all JARs are uploaded to the Flink JobManager’s BlobServer; TaskManagers pull them at runtime, preventing ClassNotFoundException.
5.2 CTAS Lifecycle
During client compilation, the engine derives the target schema from the SELECT query.
The schema and a serialized catalog hook are sent to the JobManager.
At job start, the hook creates the target table via the catalog.
If the job succeeds, the table remains; if it fails or is cancelled, the hook drops the table to ensure atomicity.
5.3 Dynamic Partition Write Plan
Without the optimization, Flink inserts a Sort node before writing each partition. Setting the job option disable-sort=true removes this node, allowing direct writes and reducing latency.
5.4 Small‑File Compaction Topology
The write path consists of
Writer → CompactorCoordinator → Rewriter → PartitionCommitter. After writers finish, the coordinator aggregates file metadata, decides which files are “small”, and instructs the rewriter to merge them before committing.
5.5 Adaptive Scheduler Algorithm
When an upstream operator finishes, the framework records its output size. The downstream operator’s parallelism is then computed as:
parallelism = ceil(outputSize / targetPartitionSize)where targetPartitionSize is a configurable threshold (e.g., 128 MiB). This calculation is performed at runtime, enabling dynamic scaling.
5.6 Speculative Execution Flow
SlowTaskDetector monitors task progress and flags tasks whose runtime exceeds a configurable multiple of the average.
The speculative scheduler launches a shadow task on a different node.
The first task to finish writes its output; the other task is cancelled and its partial output is cleaned up.
For sources, the scheduler ensures that shadow tasks process the same data splits; for sinks, only the winning task’s output is committed.
6. Operational Practices
Configuration is split into Flink‑specific settings and Hadoop/Hive settings; HiveServer forwards the Hive session configuration to Flink.
SQL‑Client auto‑completion is disabled for file‑based scripts to avoid unintended token replacement.
Job progress reporting is implemented; if no progress is reported for a configurable interval, HiveServer aborts the job to avoid hanging.
7. Future Work
Complete monitoring dashboards and History Server integration for easier debugging.
Extend support for complex joins and full Hive UDAF compatibility (e.g., rank‑type functions).
Develop a unified storage layer that serves both streaming and batch workloads, eliminating data duplication.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
