Big Data 13 min read

Using Flink Upsert‑Kafka Connector for Real‑Time Data Aggregation and TiDB Synchronization

This article explains the upsert‑kafka connector in Flink, its configuration parameters, step‑by‑step usage with SQL examples, and demonstrates a complete pipeline that reads Kafka streams, aggregates page view metrics, and synchronizes the results to TiDB in real time.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using Flink Upsert‑Kafka Connector for Real‑Time Data Aggregation and TiDB Synchronization

What is the Upsert Kafka Connector?

The upsert‑kafka connector, introduced in FLIP‑149, enables Flink to treat Kafka message keys as primary keys, allowing INSERT, UPDATE, and DELETE semantics when reading from or writing to Kafka topics. It can be used both as a source (producing changelog streams) and as a sink (consuming changelog streams).

Key Parameters

Essential properties include connector='upsert-kafka', topic, properties.bootstrap.servers, key.format and value.format (supporting csv, json, avro). Optional settings such as value.fields-include, key.fields-prefix, and generic properties.* allow fine‑grained control over serialization and Kafka client behavior.

Usage Steps

1. Add the Flink Kafka connector dependency:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.0</version>
    <scope>provided</scope>
</dependency>

2. Create source and upsert sink tables with SQL, defining a primary key for the sink:

CREATE TABLE source_ods_fact_user_ippv (
    user_id STRING,
    client_ip STRING,
    client_info STRING,
    pagecode STRING,
    access_time TIMESTAMP,
    dt STRING,
    WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND
) WITH (
   'connector' = 'kafka',
   'topic' = 'user_ippv',
   'scan.startup.mode' = 'earliest-offset',
   'properties.group.id' = 'group1',
   'properties.bootstrap.servers' = 'xxx:9092',
   'format' = 'json',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
);

CREATE TABLE result_total_pvuv_min (
    do_date STRING,
    do_min STRING,
    pv BIGINT,
    uv BIGINT,
    currenttime TIMESTAMP,
    PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'result_total_pvuv_min',
  'properties.bootstrap.servers' = 'xxx:9092',
  'key.format' = 'json',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);

3. Define a view to aggregate PV/UV per minute and insert the results into the upsert sink:

CREATE VIEW view_total_pvuv_min AS
SELECT dt AS do_date,
       COUNT(client_ip) AS pv,
       COUNT(DISTINCT client_ip) AS uv,
       MAX(access_time) AS access_time
FROM source_ods_fact_user_ippv
GROUP BY dt;

INSERT INTO result_total_pvuv_min
SELECT do_date,
       CAST(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,
       pv,
       uv,
       CURRENT_TIMESTAMP AS currenttime
FROM view_total_pvuv_min;

The example data shows how each minute’s aggregated metrics are emitted as upsert records, with INSERT (+I), DELETE (value‑null), and UPDATE (+U/-U) events.

Flink → TiDB Synchronization

To persist the aggregated results, a JDBC sink pointing to TiDB is created. TiDB’s MySQL‑compatible interface allows Flink to write upserted rows directly:

CREATE TABLE sink_upsert_tidb (
    user_id INT,
    client_ip STRING,
    client_info STRING,
    page_code STRING,
    access_time TIMESTAMP,
    dt STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://xxx:4000/bi',
  'username' = 'bi_rw',
  'password' = 'xxx',
  'table-name' = 'result_user_behavior'
);

Data is then inserted with a simple SQL statement that selects from the Kafka source table.

Conclusion

The upsert‑kafka connector provides a versatile way to perform real‑time aggregation, support downstream joins, or feed other storage systems such as HDFS or Iceberg. Combined with TiDB, it forms a powerful end‑to‑end solution for low‑latency analytics and transactional consistency.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSQLStreamingTiDBUpsert
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.