Big Data 9 min read

Introduction to Flink CDC: Concepts, Use Cases, and Implementation

This article explains Change Data Capture (CDC) and how Flink CDC can be used for incremental data synchronization, real‑time materialized views, audit logging, and CDC‑based joins, providing code examples, Maven dependencies, and SQL/Java snippets for MySQL and Kafka integrations.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Introduction to Flink CDC: Concepts, Use Cases, and Implementation

CDC (Change Data Capture) is a technique that captures committed changes (INSERT, UPDATE, DELETE) from a database and streams them to downstream systems for further processing.

Typical CDC use cases include:

Data synchronization with Flink SQL, e.g., moving data from MySQL to Elasticsearch.

Real‑time materialized views on source databases.

Low‑latency incremental sync.

Event‑time joins with temporal tables for accurate results.

Flink 1.11 supports two built‑in CDC formats—Debezium and Canal—allowing tables to handle not only append operations but also upserts and deletes.

Common Flink CDC scenarios are:

Incremental data sync between databases.

Audit logging.

Real‑time materialized views on top of databases.

CDC‑based dimension table joins.

Flink provides two built‑in connectors (PostgreSQL and MySQL). Below is a MySQL example.

Flink CDC reduces the need for separate Canal and Kafka layers, shortening the data pipeline, lowering latency, and offering exactly‑once semantics.

To use the MySQL connector, add the following Maven dependency:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.1.0</version>
</dependency>

SQL to create a MySQL CDC source table:

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database-name' = 'inventory',
  'table-name' = 'products'
);

Java API example to read from MySQL CDC:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
      .hostname("localhost")
      .port(3306)
      .databaseList("inventory") // monitor all tables under inventory database
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
      .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.addSource(sourceFunction)
       .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute();
  }
}

If Flink acts as a transformation layer, it supports two formats: canal-json and debezium-json. To consume Canal JSON data from Kafka, add the Kafka connector dependency:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.11.0</version>
</dependency>

Example table definition for a Kafka source with Canal JSON format:

CREATE TABLE topic_products (
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10,2)
) WITH (
  'connector' = 'kafka',
  'topic' = 'products_binlog',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'format' = 'canal-json'  -- using canal-json as the format
);

For changelog‑json format, add the following dependency:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <version>1.0.0</version>
</dependency>

Example of using changelog‑json to aggregate UV (unique visitors) and write results back to Kafka:

-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

CREATE TABLE day_uv (
    day_str STRING,
    uv BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'day_uv',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'changelog-json'
);

INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') AS date_str, COUNT(DISTINCT user_id) AS uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');

SELECT * FROM day_uv;

The article concludes with a copyright notice stating that the content is authored by the "Big Data Technology and Architecture" team and should not be reproduced without permission.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSQLmysqlCDCChange Data Capture
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.