Deep Dive into Apache Iceberg Core Features and Flink Integration
This article explains Apache Iceberg’s architecture, core capabilities such as time‑travel, fast scans, delete handling, and schema evolution, and provides a step‑by‑step guide for configuring Flink to use Iceberg with Hive and Hadoop catalogs, including DDL commands and streaming queries.
Apache Iceberg is an open table format designed for massive analytic datasets, offering efficient storage, schema evolution, and support for row‑level updates, deletes, and merges.
Traditional Hive/Spark on HDFS suffers from limitations like lack of row‑level updates, read/write interference, no version rollback, coarse partition pruning, and no schema changes. Iceberg addresses these by introducing snapshots, manifest lists, manifests, and data files.
Core Concepts
Each commit creates a new Snapshot . A Manifest List tracks all manifests for a snapshot, each Manifest tracks its data files, and Data Files store the actual rows. Delete files (position and equality deletes) are also tracked.
Time Travel & Incremental Reads
Users can query historical data by specifying an as-of-timestamp option in Spark or Flink, which selects the appropriate snapshot and reads its data files. Incremental reads work similarly by scanning snapshots within a time range.
Fast Scan & Data Filtering
Locate the latest snapshot (or a specific timestamp).
Filter manifests using partition predicates.
Collect metadata‑only data files from the selected manifests.
Apply column‑level statistics (value counts, null counts, min/max) for fine‑grained filtering.
Delete Implementation
Iceberg supports two delete types:
Position Delete : stores file_path, pos, and row to precisely remove rows.
Equality Delete : stores values of user‑defined equality_ids (e.g., user_id) to delete matching rows.
Deletes are applied during reads based on a shared sequence number that orders data and delete files, ensuring correct semantics even with concurrent writes.
Schema Evolution
Iceberg allows adding, dropping, renaming, modifying, and reordering fields. Each commit records the schema version in its manifest, and readers automatically apply the necessary transformations.
Flink + Iceberg Environment Setup
1. Flink SQL Client – Iceberg Dependencies
# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
# scp the jar to all Flink nodesIceberg uses the Hadoop catalog by default; for Hive catalog, add flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar to the lib directory.
2. Maven/Gradle Dependency
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>0.13.0</version>
<scope>provided</scope>
</dependency>3. Catalog Configuration
Hive Catalog
Flink SQL> create catalog hive_catalog with (
'type'='iceberg',
'catalog-type'='hive',
'property-version'='1',
'cache-enabled'='true',
'uri'='thrift://hive1:9083',
'client'='5',
'warehouse'='hdfs://nnha/user/hive/warehouse',
'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
);Hadoop (HDFS) Catalog
Flink SQL> create catalog hadoop_catalog with (
'type'='iceberg',
'catalog-type'='hadoop',
'property-version'='1',
'cache-enabled'='true',
'warehouse'='hdfs://nnha/user/iceberg/warehouse'
);4. DDL Operations
Create Database & Table
Flink SQL> create database hadoop_catalog.iceberg_db;
Flink SQL> use hadoop_catalog.iceberg_db;
Flink SQL> create table my_user (
user_id BIGINT comment '用户ID',
user_name STRING,
birthday DATE,
country STRING
) comment '用户表'
partitioned by (birthday, country)
with (
'write.format.default'='parquet',
'write.parquet.compression-codec'='gzip'
);Alter Table
Flink SQL> alter table my_user set (
'write.format.default'='avro',
'write.avro.compression-codec'='gzip'
);Note: Hadoop catalog tables cannot be renamed.
Insert & Overwrite
Flink SQL> insert into my_user values (1, 'zhang_san', DATE '2022-02-01', 'china');
Flink SQL> insert overwrite my_user partition (birthday='2022-02-02', country='japan') values (5, 'zhao_liu');5. Querying
Batch mode: Flink SQL> select * from my_user; Streaming mode with incremental snapshots:
Flink SQL> set 'execution.runtime-mode'='streaming';
Flink SQL> select * from my_user /*+ options('streaming'='true','monitor-interval'='5s','start-snapshot-id'='138573494821828246') */;Specifying a start snapshot reads only newer snapshots; omitting it reads the full current snapshot first.
Key Takeaways
Iceberg provides snapshot‑based table management, enabling time travel, fine‑grained pruning, and reliable deletes.
Flink can seamlessly read and write Iceberg tables using either Hive or Hadoop catalogs.
DDL commands, insert semantics, and streaming queries are fully supported, with clear handling of versioned metadata.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
