Multi‑Stream Join and Concurrency Control in Apache Hudi: Design, Implementation, and Usage
This article presents a comprehensive solution for multi‑stream joins in Apache Hudi, detailing the challenges of dimension and multi‑stream joins, the novel storage‑layer join approach, timeline‑based concurrency control, marker mechanisms, early conflict detection, payload customization, and practical usage with Flink and Spark, along with performance benefits and future directions.
Background
Real‑time analytics often require joining multiple data sources on a stream to produce a wide table, but traditional approaches face challenges such as high QPS latency for dimension tables and large state size for multi‑stream joins.
Core Capabilities
Hudi provides a timeline that records every commit, clean, compaction, etc., enabling point‑in‑time queries and efficient incremental reads. Actions include COMMITS, CLEANS, DELTA_COMMIT, COMPACTION, ROLLBACK, and SAVEPOINT.
Timeline (Time‑Line)
The timeline abstracts commits as HoodieInstant objects, each binding a timestamp, operation type, and state, allowing queries to read only data committed after a specific instant.
Concurrency Control
Hudi adopts a log‑based, file‑level concurrency protocol built on the timeline, avoiding heavyweight locks and supporting high‑throughput streaming workloads. Three models are described: (1) single‑writer inline table service, (2) single‑writer asynchronous table service, and (3) multi‑writer with optimistic concurrency control.
Marker Mechanism
Markers track the existence of data files to clean up incomplete writes and roll back failed commits. A marker consists of the data file name, a .marker extension, and the I/O operation (CREATE, MERGE, APPEND). Markers enable removal of duplicate partial files and rollback of failed attempts.
Timeline‑Based Marker Optimization
Markers are now created and managed by a timeline server, which batches requests and writes them to a limited number of underlying files, reducing file‑system latency and improving write performance.
Early Conflict Detection
Traditional OCC checks conflicts after data write; early conflict detection checks before marker creation, allowing writers to abort early and save resources. It leverages both direct markers and timeline‑server markers.
Transactional Write (ACID)
Hudi ensures atomic commits, rollback of partial failures, and repeatable reads, providing strong ACID guarantees for lakehouse workloads.
Flexible Payload Mechanism
Custom payload classes (e.g., PartialUpdateAvroPayload ) allow row‑level deduplication, field‑level updates, and merge logic based on a pre‑combine field. The payload implements HoodieRecordPayload with preCombine and combineAndGetUpdateValue methods.
Multi‑Stream Join Process
Log files are deduplicated into a map; records from different streams are merged rather than overwritten. Base files are then merged with the deduplicated log, updating only the columns present in the log record.
Implementation Diagram
The diagram shows how custom payloads handle cross‑batch merges, with timeline‑based markers ensuring consistency.
How to Use
Maven dependency :
<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.13-bundle</artifactId>
<version>0.12.0</version>
</dependency>
</dependencies>Flink job DDL for two writers (Job 1 and Job 2) creates source tables A/B and a common Hudi target table with parameters such as write.payload.class , hoodie.write.lock.early.conflict.detection.enable , and log suffixes to distinguish jobs. Example DDL snippets:
CREATE TABLE sourceA (
uuid STRING,
name STRING,
_ts1 timestamp(3)
) WITH (...);
public static String sinkTableDDL1() {
return String.format("create table %s (\n uuid STRING,\n name STRING,\n age int,\n _ts1 bigint,\n _ts2 bigint,\n PRIMARY KEY(uuid) NOT ENFORCED\n) PARTITIONED BY (_ts1) with (\n 'connector' = 'hudi',\n 'path' = '%s',\n 'table.type' = 'MERGE_ON_READ',\n ...\n 'write.payload.class' = 'org.apache.hudi.PartialUpdateAvroPayload',\n 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n 'hoodie.write.lock.early.conflict.detection.strategy' = 'org.apache.hudi.client.transaction.SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy'\n)" , sinkAliasTable1, basePath);
}Insert statements for each stream:
INSERT INTO %s(uuid, name, _ts1) SELECT uuid, name, ts AS _ts1 FROM sourceA; INSERT INTO %s(uuid, age, _ts2) SELECT uuid, age, ts AS _ts2 FROM sourceB;Parameter table lists required and optional settings such as path , table.type , write.operation , write.payload.class , write.partition.format , hoodie.write.concurrency.mode , and early conflict detection flags.
Querying
Data can be queried via Spark SQL or Presto (pending updates). Example Spark query:
SELECT * FROM hudi_tauth_test.hudi_partial_01_rt LIMIT 10;Benefits
The multi‑stream join solution supports >3 concurrent streams, handling hundreds of terabytes, and improves query performance by 200%+ for wide‑table scans compared to traditional multi‑table joins.
Future Work
Plans include simplifying configuration, adding SQL support for column‑level inserts/updates, extending payloads for Flink joins and Top‑N, and upstreaming multi‑writer features to the community.
References
Relevant papers and blog posts on Hudi payloads, optimistic concurrency control, and lakehouse design are listed.
DataFunTalk
Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.