Using Flink Upsert‑Kafka Connector for Real‑Time Data Aggregation and TiDB Synchronization
This article explains the upsert‑kafka connector in Flink, its configuration parameters, step‑by‑step usage with SQL examples, and demonstrates a complete pipeline that reads Kafka streams, aggregates page view metrics, and synchronizes the results to TiDB in real time.
What is the Upsert Kafka Connector?
The upsert‑kafka connector, introduced in FLIP‑149, enables Flink to treat Kafka message keys as primary keys, allowing INSERT, UPDATE, and DELETE semantics when reading from or writing to Kafka topics. It can be used both as a source (producing changelog streams) and as a sink (consuming changelog streams).
Key Parameters
Essential properties include connector='upsert-kafka', topic, properties.bootstrap.servers, key.format and value.format (supporting csv, json, avro). Optional settings such as value.fields-include, key.fields-prefix, and generic properties.* allow fine‑grained control over serialization and Kafka client behavior.
Usage Steps
1. Add the Flink Kafka connector dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>2. Create source and upsert sink tables with SQL, defining a primary key for the sink:
CREATE TABLE source_ods_fact_user_ippv (
user_id STRING,
client_ip STRING,
client_info STRING,
pagecode STRING,
access_time TIMESTAMP,
dt STRING,
WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_ippv',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'group1',
'properties.bootstrap.servers' = 'xxx:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE result_total_pvuv_min (
do_date STRING,
do_min STRING,
pv BIGINT,
uv BIGINT,
currenttime TIMESTAMP,
PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'result_total_pvuv_min',
'properties.bootstrap.servers' = 'xxx:9092',
'key.format' = 'json',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);3. Define a view to aggregate PV/UV per minute and insert the results into the upsert sink:
CREATE VIEW view_total_pvuv_min AS
SELECT dt AS do_date,
COUNT(client_ip) AS pv,
COUNT(DISTINCT client_ip) AS uv,
MAX(access_time) AS access_time
FROM source_ods_fact_user_ippv
GROUP BY dt;
INSERT INTO result_total_pvuv_min
SELECT do_date,
CAST(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,
pv,
uv,
CURRENT_TIMESTAMP AS currenttime
FROM view_total_pvuv_min;The example data shows how each minute’s aggregated metrics are emitted as upsert records, with INSERT (+I), DELETE (value‑null), and UPDATE (+U/-U) events.
Flink → TiDB Synchronization
To persist the aggregated results, a JDBC sink pointing to TiDB is created. TiDB’s MySQL‑compatible interface allows Flink to write upserted rows directly:
CREATE TABLE sink_upsert_tidb (
user_id INT,
client_ip STRING,
client_info STRING,
page_code STRING,
access_time TIMESTAMP,
dt STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx:4000/bi',
'username' = 'bi_rw',
'password' = 'xxx',
'table-name' = 'result_user_behavior'
);Data is then inserted with a simple SQL statement that selects from the Kafka source table.
Conclusion
The upsert‑kafka connector provides a versatile way to perform real‑time aggregation, support downstream joins, or feed other storage systems such as HDFS or Iceberg. Combined with TiDB, it forms a powerful end‑to‑end solution for low‑latency analytics and transactional consistency.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
