Big Data 31 min read

End-to-End Tutorial: Sync MySQL Binlog to Kafka and Consume with Flink Using TiDB

This article provides a step‑by‑step guide to build a data pipeline that captures MySQL binlog, streams it through Canal into Kafka, processes it with Flink, and finally writes the results into TiDB, covering environment setup, component deployment, configuration, and verification.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
End-to-End Tutorial: Sync MySQL Binlog to Kafka and Consume with Flink Using TiDB

This article provides a step‑by‑step guide to build a data pipeline that captures MySQL binlog, streams it through Canal into Kafka, processes it with Flink before writing the results into TiDB.

It begins with a background overview and describes the single‑machine test environment, recommending separate JREs for each component.

Environment setup lists the required software versions (TiDB v4.0.9, Kafka 2.7.0, Flink 1.12.1, MySQL 8.0.23, Zookeeper 3.6.2, Canal 1.1.4) and the host allocation.

TiDB cluster deployment shows the TiUP topology file and the command to display the cluster, followed by the creation of a test table.

# tiup cluster display tidb-c1-v409
Cluster type:       tidb
Cluster name:       tidb-c1-v409
Cluster version:    v4.0.9
Dashboard URL:      http://192.168.12.21:12379/dashboard
ID                   Role  Host           Ports        OS/Arch       Status   Data Dir                      Deploy Dir
192.168.12.21:12379  pd    192.168.12.21  12379/12380  linux/x86_64  Up|L|UI  /opt/tidb-c1/data/pd-12379    /opt/tidb-c1/pd-12379
192.168.12.21:14000  tidb  192.168.12.21  14000/12080  linux/x86_64  Up       -                             /opt/tidb-c1/tidb-14000
192.168.12.21:12160  tikv  192.168.12.21  12160/12180  linux/x86_64  Up       /opt/tidb-c1/data/tikv-12160  /opt/tidb-c1/tikv-12160

Zookeeper deployment includes extracting the package, installing a dedicated JRE, editing zkEnv.sh, creating zoo.cfg, and starting the service.

# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
# mv apache-zookeeper-3.6.2-bin /opt/zookeeper
# tar vxzf jre1.8.0_281.tar.gz
# mv jre1.8.0_281 /opt/zookeeper/jre
# echo "JAVA_HOME=/opt/zookeeper/jre" >> /opt/zookeeper/bin/zkEnv.sh
# cat > /opt/zookeeper/conf/zoo.cfg <<EOF
 tickTime=2000
 initLimit=10
 syncLimit=5
 dataDir=/opt/zookeeper/data
 clientPort=2181
EOF
# /opt/zookeeper/bin/zkServer.sh start

Kafka deployment follows a similar pattern: extract, add JRE, modify server.properties, and start the broker.

# tar vxzf kafka_2.13-2.7.0.tgz
# mv kafka_2.13-2.7.0 /opt/kafka
# tar vxzf jre1.8.0_281.tar.gz
# mv jre1.8.0_281 /opt/kafka/jre
# echo "JAVA_HOME=/opt/kafka/jre" >> /opt/kafka/bin/kafka-run-class.sh
# cat > /opt/kafka/config/server.properties <<EOF
 broker.id=0
 listeners=PLAINTEXT://192.168.12.22:9092
 log.dirs=/opt/kafka/logs
 zookeeper.connect=192.168.12.24:2181
EOF
# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

Flink deployment installs Flink, adds a JRE, copies the Kafka and JDBC connectors, and configures flink-conf.yaml before starting the cluster.

# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
# mv flink-1.12.1 /opt/flink
# tar vxzf jre1.8.0_281.tar.gz
# mv jre1.8.0_281 /opt/flink/jre
# cp flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/
# cp flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/
# echo "jobmanager.rpc.address: 192.168.12.23" >> /opt/flink/conf/flink-conf.yaml
# echo "env.java.home: /opt/flink/jre" >> /opt/flink/conf/flink-conf.yaml
# /opt/flink/bin/start-cluster.sh

MySQL deployment extracts the binary, creates a systemd service, configures my.cnf, initializes the data directory, and creates users for Canal replication.

# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql
# /opt/mysql/bin/mysqld --initialize --user=mysql --console
# systemctl start mysqld
# mysql -uroot -p
mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY 'mysql';
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mysql> FLUSH PRIVILEGES;

Canal deployment extracts the package, installs a JRE, updates startup.sh, and configures canal.properties and instance.properties to point to MySQL, Zookeeper, and Kafka.

# tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal
# tar vxzf jre1.8.0_281.tar.gz
# mv jre1.8.0_281 /opt/canal/jre
# echo "JAVA=/opt/canal/jre/bin/java" >> /opt/canal/bin/startup.sh
# cat > /opt/canal/conf/canal.properties <<EOF
 canal.zkServers=192.168.12.24:2181
 canal.serverMode=kafka
 canal.destinations=example
 canal.mq.servers=192.168.12.22:9092
EOF
# cat > /opt/canal/conf/example/instance.properties <<EOF
 canal.instance.master.address=192.168.12.25:3306
 canal.instance.dbUsername=canal
 canal.instance.dbPassword=canal
 canal.instance.filter.regex=.*\..*
 canal.mq.topic=canal-kafka
EOF
# /opt/canal/bin/startup.sh

After all components are running, the article demonstrates the data flow:

MySQL generates binlog.

Canal reads the binlog, converts it to Canal‑JSON, and pushes it to Kafka topic canal-kafka.

Flink consumes the topic with the canal-json format, processes the records, and writes them to TiDB via the JDBC connector.

SQL examples show creating the Kafka topic, inserting data into MySQL, and querying the results in Flink and TiDB, confirming the end‑to‑end pipeline works.

Flink SQL example:

CREATE TABLE t2 (
    id INT
) WITH (
    'connector' = 'kafka',
    'topic' = 'canal-kafka',
    'properties.bootstrap.servers' = '192.168.12.22:9092',
    'properties.group.id' = 'canal-kafka-consumer-group',
    'format' = 'canal-json',
    'scan.startup.mode' = 'latest-offset'
);
INSERT INTO t2 VALUES (2);
SELECT * FROM t2;

Flink to TiDB example:

CREATE TABLE t3 (
    id INT
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.12.21:14000/test',
    'table-name' = 't3',
    'username' = 'root',
    'password' = 'mysql'
);
INSERT INTO t3 VALUES (3);

Verification in TiDB:

mysql> SELECT * FROM test.t3;
+------+
| id   |
+------+
| 3    |
+------+
data pipelineFlinkMySQLTiDBCanal
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.