Apache Hudi Copy‑On‑Write Tutorial: Core Concepts and Hands‑On Spark Implementation
This article introduces Apache Hudi’s core concepts and demonstrates how to operate in Copy‑On‑Write mode on a Spark‑based data lake, covering prerequisites, table types, configuration properties, upsert, incremental queries, and record deletion with Scala code examples.
Most modern data lakes are built on distributed file systems (DFS) like HDFS or cloud storage such as AWS S3, following a "write‑once‑read‑many" model. However, analytical data lakes often require frequent updates, leading to the use of separate streaming and batch systems.
Apache Hudi manages the ingest‑reconcile‑compact‑purge workflow for incremental updates on HDFS, providing a clean API for querying the latest view or point‑in‑time changes. This article explains Hudi’s core concepts and demonstrates operations in Copy‑On‑Write (CoW) mode.
Outline
Prerequisites and framework versions
Hudi core concepts
Initial setup and dependencies
Using CoW tables
Prerequisites and Framework Versions
Familiarity with Scala Spark jobs and Parquet I/O is assumed. The required versions are:
JDK: openjdk 1.8.0_242
Scala: 2.12.8
Spark: 2.4.4
Hudi Spark bundle: 0.5.2‑incubating
Note: A known bug in Hudi v0.5.0‑incubating on AWS EMR causing upsert hangs is fixed in later versions.
Hudi Core Concepts
Hudi supports two table types:
Copy‑On‑Write (CoW) : Each write triggers an ingest‑reconcile‑compact‑purge cycle, keeping the table always up‑to‑date. Data is stored only as Parquet files, resulting in fewer files.
Merge‑On‑Read (MoR) : Writes create delta log files that are merged at read time, allowing faster writes but higher read latency.
The trade‑offs are summarized in a table (data latency, I/O cost, Parquet size, write amplification).
Hudi also defines two main query types:
Snapshot query – returns the latest view (CoW) or near‑real‑time view (MoR).
Incremental query – returns records committed after a specified instant.
MoR tables additionally support read‑optimized queries that skip delta merges.
Key Write Properties
hoodie.datasource.write.table.type : Table type (default COPY_ON_WRITE).
hoodie.table.name : Unique table name.
hoodie.datasource.write.recordkey.field : Primary key column.
hoodie.datasource.write.precombine.field : Column used to resolve duplicate keys (e.g., timestamp).
hoodie.datasource.write.operation : Write operation (upsert, insert, bulk_insert, delete).
Initial Setup and Dependencies
Include the following Maven dependencies:
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.12.8</scala.version>
<scala.compat.version>2.12</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle_${scala.compat.version}</artifactId>
<version>0.5.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.compat.version}</artifactId>
<version>2.4.4</version>
</dependency>
</dependencies>Define a simple schema using a Scala case class:
case class Album(albumId: Long, title: String, tracks: Array[String], updateDate: Long)Using CoW Tables
Set the base path and implement an upsert helper:
val basePath = "/tmp/store"
private def upsert(albumDf: DataFrame, tableName: String, key: String, combineKey: String) = {
albumDf.write
.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey)
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(SaveMode.Append)
.save(s"$basePath/$tableName/")
}Insert initial data and verify with a snapshot query:
val tableName = "Album"
upsert(INITIAL_ALBUM_DATA.toDF(), tableName, "albumId", "updateDate")
spark.read.format("hudi").load(s"$basePath/$tableName/*").show()Perform an upsert with new records, then run another snapshot query to see the updated view.
Incremental Query
Set the query type to incremental and specify the begin instant time:
spark.read
.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200412183510")
.load(s"$basePath/$tableName")
.show()This returns all records committed after the given instant.
Delete Records
Delete records by providing a DataFrame containing only the primary keys:
val deleteKeys = Seq(
Album(803, "", null, 0L),
Album(802, "", null, 0L)
)
import spark.implicits._
val df = deleteKeys.toDF()
df.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "albumId")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(s"$basePath/$tableName/")
spark.read.format("hudi").load(s"$basePath/$tableName/*").show()The tutorial concludes with a preview of operations on Merge‑On‑Read tables.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.