Using SparkSQL to Connect and Operate with Apache Hudi: Configuration, Table Creation, Data Manipulation, and Deletion
This guide demonstrates how to configure Hive metastore, connect SparkSQL to Apache Hudi, create COW and MOR tables, perform insert, update, merge, delete, and insert‑overwrite operations, and illustrates each step with executable code snippets and sample results.
1. SparkSQL Connection to Hudi
1.1 Hive Configuration
We need to externalize Hive's metastore service, edit hive-site.xml to set the metastore URI, and start the metastore daemon.
-- Currently only one node is specified; Zookeeper can be used for HA
cd $HIVE_HOME/conf
vi hive-site.xml
<property>
<name>hive.metastore.uris</name>
<value>thrift://hp5:9083</value>
</property>
nohup hive --service metastore &
netstat -an | grep 90831.2 SparkSQL Connection
# Spark 3.3
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'2. Table Creation
When creating tables, pay attention to three aspects: table type (COW or MOR), partitioned vs. non‑partitioned, and managed vs. external.
2.1 Regular Table Creation
-- Create database
create database spark_hudi;
use spark_hudi;
-- Non‑partitioned COW table
create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi
location '/user/hudi/hudi_cow_nonpcf_tbl';
-- MOR non‑partitioned table (preCombineField used for deduplication)
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
)
location '/user/hudi/hudi_mor_tbl';
-- Partitioned COW table with pre‑combine field
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/user/hudi/hudi_cow_pt_tbl';2.2 CTAS (Create Table As Select)
-- CTAS: create a non‑partitioned COW table without preCombineField
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
location '/user/hudi/hudi_ctas_cow_nonpcf_tbl'
as
select 1 as id, 'a1' as name, 10 as price;
-- CTAS: create a partitioned COW table
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
location '/user/hudi/hudi_ctas_cow_pt_tbl'
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;3. Inserting Data
-- Insert into non‑partitioned tables
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
-- Dynamic partition insert
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
-- Static partition insert
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;4. Querying Data
# Ordinary query
select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0;
# Time‑travel query (based on commit timestamp)
select * from hudi_cow_pt_tbl timestamp as of '20221118154519532' where id = 1;
select * from hudi_cow_pt_tbl timestamp as of '2022-11-18 15:45:19.532' where id = 1;
select * from hudi_cow_pt_tbl timestamp as of '2022-03-08' where id = 1;5. Updating Data
5.1 Simple UPDATE
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
update hudi_cow_pt_tbl set ts = 1001 where name = 'a1';5.2 MERGE INTO
-- Create source tables for merge testing
create table hudi_merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts')
location '/user/hudi/hudi_merge_source';
insert into hudi_merge_source values (1, "old_a1", 22.22, 900), (2, "old_a2", 33.33, 2000), (3, "old_a3", 44.44, 2000);
create table hudi_merge_source2 (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts')
location '/user/hudi/hudi_merge_source2';
insert into hudi_merge_source2 values (2, "new_a2", 22.22, 900), (3, "new_a3", 33.33, 2000), (4, "new_a4", 44.44, 2000);
-- Merge operation
merge into hudi_merge_source as target
using (select * from hudi_merge_source2) source
on target.id = source.id
when matched then
update set name = source.name, price = source.price, ts = source.ts
when not matched then
insert (id, name, price, ts) values (source.id, source.name, source.price, source.ts);6. Deleting Data
Apache Hudi supports soft delete (fields cleared, record key kept) and hard delete (record physically removed). SparkSQL currently only supports hard delete.
DELETE FROM hudi_merge_source WHERE id = 1;7. Insert Overwrite
-- Overwrite non‑partitioned tables
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;
-- Overwrite partitioned table with dynamic partition
insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';
-- Overwrite partitioned table with static partition
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
