Configuring Flink SQL Client with Iceberg: Catalogs, DDL, Data Insertion and Query
This guide explains how to set up the Flink SQL client to work with Apache Iceberg, covering Scala version requirements, downloading and deploying Iceberg jars, configuring Hive and HDFS catalogs, creating databases and tables, performing insert and overwrite operations, and querying data in both batch and streaming modes.
1. Flink SQL Client Configuration with Iceberg
Flink clusters must use Scala 2.12. Download the Iceberg runtime JAR to the /root/flink-1.14.3/lib directory on every Flink node and restart the cluster.
# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink2:/root/flink-1.14.3/lib
# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink3:/root/flink-1.14.3/libIceberg uses the Hadoop catalog by default; to use the Hive catalog, place flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar in the same lib directory and restart Flink, then start the SQL client.
2. Java/Scala pom.xml Configuration
Add the Iceberg dependency with provided scope to your Maven project.
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>0.13.0</version>
<scope>provided</scope>
</dependency>3. Catalog
3.1 Hive Catalog
Creates a catalog that stores metadata in Hive metastore and table data in HDFS.
Flink SQL> create catalog hive_catalog with(
'type'='iceberg',
'catalog-type'='hive',
'property-version'='1',
'cache-enabled'='true',
'uri'='thrift://hive1:9083',
'client'='5',
'warehouse'='hdfs://nnha/user/hive/warehouse',
'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
);
[INFO] Execute statement succeed.Key properties:
property-version: compatibility version, set to 1.
cache-enabled: enables catalog cache (default true).
client: connection pool size for Hive metastore (default 2).
warehouse: HDFS path where Hive catalog stores data.
hive-conf-dir: local path of Hive configuration on the Flink cluster.
3.2 HDFS Catalog
Uses HDFS for both metadata and data.
Flink SQL> create catalog hadoop_catalog with (
'type'='iceberg',
'catalog-type'='hadoop',
'property-version'='1',
'cache-enabled'='true',
'warehouse'='hdfs://nnha/user/iceberg/warehouse'
);
[INFO] Execute statement succeed.The catalog can be made permanent by adding it to conf/sql-cli-defaults.yaml:
catalogs:
- name: hadoop_catalog
type: iceberg
catalog-type: hadoop
property-version: 1
cache-enabled: true
warehouse: hdfs://nnha/user/iceberg/warehouse4. Database and Table DDL Commands
4.1 Create Database
Flink SQL> create database hadoop_catalog.iceberg_db;
Flink SQL> use hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.A subdirectory iceberg_db is created under the warehouse; dropping the database removes it.
4.2 Create Table (no primary key)
Flink SQL> create table hadoop_catalog.iceberg_db.my_user (
user_id bigint comment '用户ID',
user_name string,
birthday date,
country string
) comment '用户表'
partitioned by (birthday, country) with (
'write.format.default'='parquet',
'write.parquet.compression-codec'='gzip'
);
[INFO] Execute statement succeed.The table does not support primary keys, computed columns, or watermarks. Partitioning is defined but Iceberg can compute partitions internally.
4.3 Alter Table
Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy set(
'write.format.default'='avro',
'write.avro.compression-codec'='gzip'
);
[INFO] Execute statement succeed.Renaming tables is not supported in the Hadoop catalog.
4.4 Drop Table
Flink SQL> drop table hadoop_catalog.iceberg_db.my_user_copy;
[INFO] Execute statement succeed.The corresponding HDFS subdirectory is removed.
5. Insert Data
5.1 insert into
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user (
user_id, user_name, birthday, country
) values (1, 'zhang_san', date '2022-02-01', 'china'),
(2, 'li_si', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f1aa8bee0be5bda8b166cc361e113268Resulting HDFS layout shows partitioned directories and Parquet files.
5.2 insert overwrite (batch only)
Flink SQL> set 'execution.runtime-mode'='batch';
[INFO] Session property has been set.
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user values (4, 'wang_wu', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 63cf6c27060ec9ebdce75b785cc3fa3aOverwrite works per partition in batch mode; it is not supported in streaming mode.
6. Query Data
Batch mode
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 5 | zhao_liu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+Streaming mode
Set runtime mode to streaming and optionally specify a start snapshot to read incremental data.
Flink SQL> set 'execution.runtime-mode'='streaming';
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user /*+ options('streaming'='true','monitor-interval'='5s','start-snapshot-id'='138573494821828246') */;
+----+----------------------+----------------------+------------+----------+
| op | user_id | user_name | birthday | country |
+----+----------------------+----------------------+------------+----------+
| +I | 5 | zhao_liu | 2022-02-02 | japan |
+----+----------------------+----------------------+------------+----------+When a start‑snapshot‑id is provided, Flink reads only snapshots after that ID, enabling incremental consumption of new data.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
