Integrating Apache Hudi with MinIO: A Comprehensive Tutorial
This tutorial explains how to set up Apache Hudi on cloud‑native object storage with MinIO, covering Hudi’s architecture, file format, timeline, write and read paths, core features, schema evolution, and step‑by‑step Spark commands for ingesting, updating, deleting, and querying data in a streaming data‑lake environment.
Apache Hudi Overview
Apache Hudi is a streaming data‑lake platform that brings core table and database capabilities—such as ACID transactions, upserts, deletes, indexing, streaming ingestion, clustering, compaction, and concurrency—directly into the lake. Launched in 2016 on Hadoop, Hudi (Hadoop Upserts Deletes and Incrementals) manages large analytical datasets on HDFS and aims to reduce latency in streaming ingestion.
Hudi now supports cloud and object storage (e.g., MinIO), moving away from traditional HDFS to achieve high‑performance, scalable, cloud‑native storage. Companies like Uber, Amazon, ByteDance, and Robinhood use Hudi for large‑scale streaming data lakes, leveraging its incremental processing stack and Hadoop FileSystem API compatibility.
Hudi File Format
Hudi stores data using base files (Parquet or HFile) and incremental log files (Avro). Changes are encoded as blocks (data, delete, or rollback) that are merged to derive updated base files, enabling efficient updates without rewriting entire datasets.
The table layout includes file organization, schema enforcement, and metadata tracking. Hudi groups files by partition and record key, recording all updates in incremental log files, which is more efficient than Hive ACID's full‑dataset merges.
The timeline, stored in the .hoodie directory, records all commit events and enables fast metadata‑driven reads, incremental queries, and time‑travel capabilities.
Hudi Write Path
Hudi provides ACID‑transactional, high‑performance writes (insert, upsert, delete, bulk_insert, insert_overwrite, etc.) optimized beyond simple Parquet/Avro writes. Each record includes a commit timestamp and sequence number, supporting event‑time tracking and watermark generation for streaming pipelines.
Hudi Read Path
Snapshot isolation allows consistent reads across engines such as Spark, Hive, Flink, Presto, Trino, Impala, Snowflake, and SQL Server. Hudi readers use engine‑specific vectorized readers and caching, merging base and log files efficiently, and support CDC, incremental queries, and point‑in‑time queries.
Core Features
Key advantages include fast upserts, small‑file optimization (critical for object storage), schema‑on‑write enforcement, ACID guarantees, optimistic concurrency control, and MVCC‑based non‑blocking writes.
Hudi + MinIO Tutorial
This hands‑on guide walks through installing Spark, MinIO, and the MinIO client, configuring S3A access, and preparing required JARs.
Prerequisites
Download and install Apache Spark.
Download and install MinIO; note its IP, port, access key, and secret key.
Download and install the MinIO client.
Add AWS SDK and Hadoop‑AWS libraries to the classpath for S3A support.
Copy required JARs to /opt/spark/jars.
Create a MinIO Bucket
mc alias set myminio http://<your‑MinIO‑IP:port> <your‑MinIO‑access‑key> <your‑MinIO‑secret‑key>
mc mb myminio/hudiStart Spark with Hudi for MinIO
spark-shell \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.hadoop.fs.s3a.access.key=<your‑MinIO‑access‑key>' \
--conf 'spark.hadoop.fs.s3a.secret.key=<your‑MinIO‑secret‑key>' \
--conf 'spark.hadoop.fs.s3a.endpoint=<your‑MinIO‑IP>:9000' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--conf 'fs.s3a.signing-algorithm=S3SignerType'Initialize Hudi in Spark:
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecordCreate a Hudi Table
val tableName = "hudi_trips_cow"
val basePath = "s3a://hudi/hudi_trips_cow"
val dataGen = new DataGeneratorInsert Data into Hudi (Upsert)
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option(TABLE_NAME, tableName)
.mode(Overwrite)
.save(basePath)Browse MinIO to see the bucket and the .hoodie metadata directory.
Query Data
val tripsSnapshotDF = spark.read.format("hudi").load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()Time‑Travel Queries
spark.read.format("hudi")
.option("as.of.instant", "2022-09-13 09:02:08.200")
.load(basePath)Update Data (Upsert)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option(TABLE_NAME, tableName)
.mode(Append)
.save(basePath)Incremental Query
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(_.getString(0)).take(50)
val beginTime = commits(commits.length - 2)
val tripsIncrementalDF = spark.read.format("hudi")
.option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(BEGIN_INSTANTTIME_OPT_KEY, beginTime)
.load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select _hoodie_commit_time, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()Soft Delete
// Prepare records with nullified non‑metadata columns
val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
val nullifyColumns = softDeleteDs.schema.fields.map(f => (f.name, f.dataType.typeName)).filter {
case (name, _) => !HoodieRecord.HOODIE_META_COLUMNS.contains(name) && !Array("ts", "uuid", "partitionpath").contains(name)
}
val softDeleteDf = nullifyColumns.foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*)) { (ds, col) =>
ds.withColumn(col._1, lit(null).cast(col._2))
}
softDeleteDf.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(OPERATION_OPT_KEY, "upsert")
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option(TABLE_NAME, tableName)
.mode(Append)
.save(basePath)Hard Delete
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
val deletes = dataGen.generateDeletes(ds.collectAsList())
val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
hardDeleteDf.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(OPERATION_OPT_KEY, "delete")
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option(TABLE_NAME, tableName)
.mode(Append)
.save(basePath)Insert‑Overwrite (Write Overwrite)
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
.filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(OPERATION.key(), "insert_overwrite")
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.mode(Append)
.save(basePath)Schema Evolution and Partition Management
Hudi supports schema evolution (add/alter columns, change types, set table properties) and partition pruning. Example SQL commands are provided for altering tables, adding columns, changing column types, and setting properties.
Conclusion
Apache Hudi is the first open‑table format for data lakes, offering streaming‑first capabilities, ACID guarantees, and efficient upserts. Integrating Hudi with MinIO enables cloud‑native, multi‑cloud data lakes with high IOPS and throughput, while MinIO’s replication and lifecycle features provide enterprise‑grade reliability and geo‑load balancing.
Further Reading
Building a Data Lake with Apache Hudi + Linkis
Apache Hudi + Flink Multi‑Stream Wide‑Table Best Practices
ByteDance Real‑Time Data Warehouse with Apache Hudi
Huawei Cloud MRS Optimized Queries with Apache Hudi
Shopee Lakehouse Practice with Apache Hudi
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
