Big Data 14 min read

Apache Hudi Copy‑On‑Write Tutorial: Core Concepts and Hands‑On Spark Implementation

This article introduces Apache Hudi’s core concepts and demonstrates how to operate in Copy‑On‑Write mode on a Spark‑based data lake, covering prerequisites, table types, configuration properties, upsert, incremental queries, and record deletion with Scala code examples.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Apache Hudi Copy‑On‑Write Tutorial: Core Concepts and Hands‑On Spark Implementation

Most modern data lakes are built on distributed file systems (DFS) like HDFS or cloud storage such as AWS S3, following a "write‑once‑read‑many" model. However, analytical data lakes often require frequent updates, leading to the use of separate streaming and batch systems.

Apache Hudi manages the ingest‑reconcile‑compact‑purge workflow for incremental updates on HDFS, providing a clean API for querying the latest view or point‑in‑time changes. This article explains Hudi’s core concepts and demonstrates operations in Copy‑On‑Write (CoW) mode.

Outline

Prerequisites and framework versions

Hudi core concepts

Initial setup and dependencies

Using CoW tables

Prerequisites and Framework Versions

Familiarity with Scala Spark jobs and Parquet I/O is assumed. The required versions are:

JDK: openjdk 1.8.0_242

Scala: 2.12.8

Spark: 2.4.4

Hudi Spark bundle: 0.5.2‑incubating

Note: A known bug in Hudi v0.5.0‑incubating on AWS EMR causing upsert hangs is fixed in later versions.

Hudi Core Concepts

Hudi supports two table types:

Copy‑On‑Write (CoW) : Each write triggers an ingest‑reconcile‑compact‑purge cycle, keeping the table always up‑to‑date. Data is stored only as Parquet files, resulting in fewer files.

Merge‑On‑Read (MoR) : Writes create delta log files that are merged at read time, allowing faster writes but higher read latency.

The trade‑offs are summarized in a table (data latency, I/O cost, Parquet size, write amplification).

Hudi also defines two main query types:

Snapshot query – returns the latest view (CoW) or near‑real‑time view (MoR).

Incremental query – returns records committed after a specified instant.

MoR tables additionally support read‑optimized queries that skip delta merges.

Key Write Properties

hoodie.datasource.write.table.type : Table type (default COPY_ON_WRITE).

hoodie.table.name : Unique table name.

hoodie.datasource.write.recordkey.field : Primary key column.

hoodie.datasource.write.precombine.field : Column used to resolve duplicate keys (e.g., timestamp).

hoodie.datasource.write.operation : Write operation (upsert, insert, bulk_insert, delete).

Initial Setup and Dependencies

Include the following Maven dependencies:

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.12.8</scala.version>
    <scala.compat.version>2.12</scala.compat.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
        <version>2.4.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-spark-bundle_${scala.compat.version}</artifactId>
        <version>0.5.2-incubating</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.compat.version}</artifactId>
        <version>2.4.4</version>
    </dependency>
</dependencies>

Define a simple schema using a Scala case class:

case class Album(albumId: Long, title: String, tracks: Array[String], updateDate: Long)

Using CoW Tables

Set the base path and implement an upsert helper:

val basePath = "/tmp/store"

private def upsert(albumDf: DataFrame, tableName: String, key: String, combineKey: String) = {
    albumDf.write
      .format("hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey)
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save(s"$basePath/$tableName/")
}

Insert initial data and verify with a snapshot query:

val tableName = "Album"
upsert(INITIAL_ALBUM_DATA.toDF(), tableName, "albumId", "updateDate")
spark.read.format("hudi").load(s"$basePath/$tableName/*").show()

Perform an upsert with new records, then run another snapshot query to see the updated view.

Incremental Query

Set the query type to incremental and specify the begin instant time:

spark.read
    .format("hudi")
    .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
    .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200412183510")
    .load(s"$basePath/$tableName")
    .show()

This returns all records committed after the given instant.

Delete Records

Delete records by providing a DataFrame containing only the primary keys:

val deleteKeys = Seq(
    Album(803, "", null, 0L),
    Album(802, "", null, 0L)
)
import spark.implicits._
val df = deleteKeys.toDF()

df.write.format("hudi")
    .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "albumId")
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
    .mode(SaveMode.Append)
    .save(s"$basePath/$tableName/")

spark.read.format("hudi").load(s"$basePath/$tableName/*").show()

The tutorial concludes with a preview of operations on Merge‑On‑Read tables.

Big DataData LakeSparkApache Hudicopy-on-writeScala
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.