Big Data 26 min read

Integrating Apache Hudi with MinIO: A Comprehensive Tutorial

This tutorial explains how to set up Apache Hudi on cloud‑native object storage with MinIO, covering Hudi’s architecture, file format, timeline, write and read paths, core features, schema evolution, and step‑by‑step Spark commands for ingesting, updating, deleting, and querying data in a streaming data‑lake environment.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Integrating Apache Hudi with MinIO: A Comprehensive Tutorial

Apache Hudi Overview

Apache Hudi is a streaming data‑lake platform that brings core table and database capabilities—such as ACID transactions, upserts, deletes, indexing, streaming ingestion, clustering, compaction, and concurrency—directly into the lake. Launched in 2016 on Hadoop, Hudi (Hadoop Upserts Deletes and Incrementals) manages large analytical datasets on HDFS and aims to reduce latency in streaming ingestion.

Hudi now supports cloud and object storage (e.g., MinIO), moving away from traditional HDFS to achieve high‑performance, scalable, cloud‑native storage. Companies like Uber, Amazon, ByteDance, and Robinhood use Hudi for large‑scale streaming data lakes, leveraging its incremental processing stack and Hadoop FileSystem API compatibility.

Hudi File Format

Hudi stores data using base files (Parquet or HFile) and incremental log files (Avro). Changes are encoded as blocks (data, delete, or rollback) that are merged to derive updated base files, enabling efficient updates without rewriting entire datasets.

The table layout includes file organization, schema enforcement, and metadata tracking. Hudi groups files by partition and record key, recording all updates in incremental log files, which is more efficient than Hive ACID's full‑dataset merges.

The timeline, stored in the .hoodie directory, records all commit events and enables fast metadata‑driven reads, incremental queries, and time‑travel capabilities.

Hudi Write Path

Hudi provides ACID‑transactional, high‑performance writes (insert, upsert, delete, bulk_insert, insert_overwrite, etc.) optimized beyond simple Parquet/Avro writes. Each record includes a commit timestamp and sequence number, supporting event‑time tracking and watermark generation for streaming pipelines.

Hudi Read Path

Snapshot isolation allows consistent reads across engines such as Spark, Hive, Flink, Presto, Trino, Impala, Snowflake, and SQL Server. Hudi readers use engine‑specific vectorized readers and caching, merging base and log files efficiently, and support CDC, incremental queries, and point‑in‑time queries.

Core Features

Key advantages include fast upserts, small‑file optimization (critical for object storage), schema‑on‑write enforcement, ACID guarantees, optimistic concurrency control, and MVCC‑based non‑blocking writes.

Hudi + MinIO Tutorial

This hands‑on guide walks through installing Spark, MinIO, and the MinIO client, configuring S3A access, and preparing required JARs.

Prerequisites

Download and install Apache Spark.

Download and install MinIO; note its IP, port, access key, and secret key.

Download and install the MinIO client.

Add AWS SDK and Hadoop‑AWS libraries to the classpath for S3A support.

Copy required JARs to /opt/spark/jars.

Create a MinIO Bucket

mc alias set myminio http://<your‑MinIO‑IP:port> <your‑MinIO‑access‑key> <your‑MinIO‑secret‑key>
mc mb myminio/hudi

Start Spark with Hudi for MinIO

spark-shell \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.hadoop.fs.s3a.access.key=<your‑MinIO‑access‑key>' \
--conf 'spark.hadoop.fs.s3a.secret.key=<your‑MinIO‑secret‑key>' \
--conf 'spark.hadoop.fs.s3a.endpoint=<your‑MinIO‑IP>:9000' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--conf 'fs.s3a.signing-algorithm=S3SignerType'

Initialize Hudi in Spark:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

Create a Hudi Table

val tableName = "hudi_trips_cow"
val basePath = "s3a://hudi/hudi_trips_cow"
val dataGen = new DataGenerator

Insert Data into Hudi (Upsert)

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option(TABLE_NAME, tableName)
  .mode(Overwrite)
  .save(basePath)

Browse MinIO to see the bucket and the .hoodie metadata directory.

Query Data

val tripsSnapshotDF = spark.read.format("hudi").load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

Time‑Travel Queries

spark.read.format("hudi")
  .option("as.of.instant", "2022-09-13 09:02:08.200")
  .load(basePath)

Update Data (Upsert)

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option(TABLE_NAME, tableName)
  .mode(Append)
  .save(basePath)

Incremental Query

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(_.getString(0)).take(50)
val beginTime = commits(commits.length - 2)
val tripsIncrementalDF = spark.read.format("hudi")
  .option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL)
  .option(BEGIN_INSTANTTIME_OPT_KEY, beginTime)
  .load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select _hoodie_commit_time, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

Soft Delete

// Prepare records with nullified non‑metadata columns
val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
val nullifyColumns = softDeleteDs.schema.fields.map(f => (f.name, f.dataType.typeName)).filter {
  case (name, _) => !HoodieRecord.HOODIE_META_COLUMNS.contains(name) && !Array("ts", "uuid", "partitionpath").contains(name)
}
val softDeleteDf = nullifyColumns.foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*)) { (ds, col) =>
  ds.withColumn(col._1, lit(null).cast(col._2))
}
softDeleteDf.write.format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(OPERATION_OPT_KEY, "upsert")
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option(TABLE_NAME, tableName)
  .mode(Append)
  .save(basePath)

Hard Delete

val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
val deletes = dataGen.generateDeletes(ds.collectAsList())
val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
hardDeleteDf.write.format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(OPERATION_OPT_KEY, "delete")
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option(TABLE_NAME, tableName)
  .mode(Append)
  .save(basePath)

Insert‑Overwrite (Write Overwrite)

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  .filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(OPERATION.key(), "insert_overwrite")
  .option(PRECOMBINE_FIELD.key(), "ts")
  .option(RECORDKEY_FIELD.key(), "uuid")
  .option(PARTITIONPATH_FIELD.key(), "partitionpath")
  .option(TBL_NAME.key(), tableName)
  .mode(Append)
  .save(basePath)

Schema Evolution and Partition Management

Hudi supports schema evolution (add/alter columns, change types, set table properties) and partition pruning. Example SQL commands are provided for altering tables, adding columns, changing column types, and setting properties.

Conclusion

Apache Hudi is the first open‑table format for data lakes, offering streaming‑first capabilities, ACID guarantees, and efficient upserts. Integrating Hudi with MinIO enables cloud‑native, multi‑cloud data lakes with high IOPS and throughput, while MinIO’s replication and lifecycle features provide enterprise‑grade reliability and geo‑load balancing.

Further Reading

Building a Data Lake with Apache Hudi + Linkis

Apache Hudi + Flink Multi‑Stream Wide‑Table Best Practices

ByteDance Real‑Time Data Warehouse with Apache Hudi

Huawei Cloud MRS Optimized Queries with Apache Hudi

Shopee Lakehouse Practice with Apache Hudi

MinIOSparkApache Hudi
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.