Big Data 18 min read

Configuring Flink SQL Client with Iceberg: Catalogs, DDL, Data Insertion and Query

This guide explains how to set up the Flink SQL client to work with Apache Iceberg, covering Scala version requirements, downloading and deploying Iceberg jars, configuring Hive and HDFS catalogs, creating databases and tables, performing insert and overwrite operations, and querying data in both batch and streaming modes.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Configuring Flink SQL Client with Iceberg: Catalogs, DDL, Data Insertion and Query

1. Flink SQL Client Configuration with Iceberg

Flink clusters must use Scala 2.12. Download the Iceberg runtime JAR to the /root/flink-1.14.3/lib directory on every Flink node and restart the cluster.

# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink2:/root/flink-1.14.3/lib
# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink3:/root/flink-1.14.3/lib

Iceberg uses the Hadoop catalog by default; to use the Hive catalog, place flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar in the same lib directory and restart Flink, then start the SQL client.

2. Java/Scala pom.xml Configuration

Add the Iceberg dependency with provided scope to your Maven project.

<dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-flink</artifactId>
    <version>0.13.0</version>
    <scope>provided</scope>
</dependency>

3. Catalog

3.1 Hive Catalog

Creates a catalog that stores metadata in Hive metastore and table data in HDFS.

Flink SQL> create catalog hive_catalog with(
    'type'='iceberg',
    'catalog-type'='hive',
    'property-version'='1',
    'cache-enabled'='true',
    'uri'='thrift://hive1:9083',
    'client'='5',
    'warehouse'='hdfs://nnha/user/hive/warehouse',
    'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
);
[INFO] Execute statement succeed.

Key properties:

property-version: compatibility version, set to 1.

cache-enabled: enables catalog cache (default true).

client: connection pool size for Hive metastore (default 2).

warehouse: HDFS path where Hive catalog stores data.

hive-conf-dir: local path of Hive configuration on the Flink cluster.

3.2 HDFS Catalog

Uses HDFS for both metadata and data.

Flink SQL> create catalog hadoop_catalog with (
    'type'='iceberg',
    'catalog-type'='hadoop',
    'property-version'='1',
    'cache-enabled'='true',
    'warehouse'='hdfs://nnha/user/iceberg/warehouse'
);
[INFO] Execute statement succeed.

The catalog can be made permanent by adding it to conf/sql-cli-defaults.yaml:

catalogs:
  - name: hadoop_catalog
    type: iceberg
    catalog-type: hadoop
    property-version: 1
    cache-enabled: true
    warehouse: hdfs://nnha/user/iceberg/warehouse

4. Database and Table DDL Commands

4.1 Create Database

Flink SQL> create database hadoop_catalog.iceberg_db;
Flink SQL> use hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.

A subdirectory iceberg_db is created under the warehouse; dropping the database removes it.

4.2 Create Table (no primary key)

Flink SQL> create table hadoop_catalog.iceberg_db.my_user (
    user_id bigint comment '用户ID',
    user_name string,
    birthday date,
    country string
) comment '用户表'
partitioned by (birthday, country) with (
    'write.format.default'='parquet',
    'write.parquet.compression-codec'='gzip'
);
[INFO] Execute statement succeed.

The table does not support primary keys, computed columns, or watermarks. Partitioning is defined but Iceberg can compute partitions internally.

4.3 Alter Table

Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy set(
    'write.format.default'='avro',
    'write.avro.compression-codec'='gzip'
);
[INFO] Execute statement succeed.

Renaming tables is not supported in the Hadoop catalog.

4.4 Drop Table

Flink SQL> drop table hadoop_catalog.iceberg_db.my_user_copy;
[INFO] Execute statement succeed.

The corresponding HDFS subdirectory is removed.

5. Insert Data

5.1 insert into

Flink SQL> insert into hadoop_catalog.iceberg_db.my_user (
    user_id, user_name, birthday, country
) values (1, 'zhang_san', date '2022-02-01', 'china'),
    (2, 'li_si', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f1aa8bee0be5bda8b166cc361e113268

Resulting HDFS layout shows partitioned directories and Parquet files.

5.2 insert overwrite (batch only)

Flink SQL> set 'execution.runtime-mode'='batch';
[INFO] Session property has been set.
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user values (4, 'wang_wu', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 63cf6c27060ec9ebdce75b785cc3fa3a

Overwrite works per partition in batch mode; it is not supported in streaming mode.

6. Query Data

Batch mode

Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday   | country |
+---------+-----------+------------+---------+
| 1       | zhang_san | 2022-02-01 | china   |
| 5       | zhao_liu  | 2022-02-02 | japan   |
| 2       | zhang_san | 2022-02-01 | china   |
+---------+-----------+------------+---------+

Streaming mode

Set runtime mode to streaming and optionally specify a start snapshot to read incremental data.

Flink SQL> set 'execution.runtime-mode'='streaming';
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user /*+ options('streaming'='true','monitor-interval'='5s','start-snapshot-id'='138573494821828246') */;
+----+----------------------+----------------------+------------+----------+
| op | user_id              | user_name            | birthday   | country  |
+----+----------------------+----------------------+------------+----------+
| +I | 5                    | zhao_liu             | 2022-02-02 | japan    |
+----+----------------------+----------------------+------------+----------+

When a start‑snapshot‑id is provided, Flink reads only snapshots after that ID, enabling incremental consumption of new data.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkSQLHiveHadoopIcebergCatalog
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.