End-to-End Tutorial: Sync MySQL Binlog to Kafka and Consume with Flink Using TiDB
This article provides a step‑by‑step guide to build a data pipeline that captures MySQL binlog, streams it through Canal into Kafka, processes it with Flink, and finally writes the results into TiDB, covering environment setup, component deployment, configuration, and verification.
This article provides a step‑by‑step guide to build a data pipeline that captures MySQL binlog, streams it through Canal into Kafka, processes it with Flink before writing the results into TiDB.
It begins with a background overview and describes the single‑machine test environment, recommending separate JREs for each component.
Environment setup lists the required software versions (TiDB v4.0.9, Kafka 2.7.0, Flink 1.12.1, MySQL 8.0.23, Zookeeper 3.6.2, Canal 1.1.4) and the host allocation.
TiDB cluster deployment shows the TiUP topology file and the command to display the cluster, followed by the creation of a test table.
# tiup cluster display tidb-c1-v409
Cluster type: tidb
Cluster name: tidb-c1-v409
Cluster version: v4.0.9
Dashboard URL: http://192.168.12.21:12379/dashboard
ID Role Host Ports OS/Arch Status Data Dir Deploy Dir
192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379
192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000
192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160Zookeeper deployment includes extracting the package, installing a dedicated JRE, editing zkEnv.sh, creating zoo.cfg, and starting the service.
# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
# mv apache-zookeeper-3.6.2-bin /opt/zookeeper
# tar vxzf jre1.8.0_281.tar.gz
# mv jre1.8.0_281 /opt/zookeeper/jre
# echo "JAVA_HOME=/opt/zookeeper/jre" >> /opt/zookeeper/bin/zkEnv.sh
# cat > /opt/zookeeper/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181
EOF
# /opt/zookeeper/bin/zkServer.sh startKafka deployment follows a similar pattern: extract, add JRE, modify server.properties, and start the broker.
# tar vxzf kafka_2.13-2.7.0.tgz
# mv kafka_2.13-2.7.0 /opt/kafka
# tar vxzf jre1.8.0_281.tar.gz
# mv jre1.8.0_281 /opt/kafka/jre
# echo "JAVA_HOME=/opt/kafka/jre" >> /opt/kafka/bin/kafka-run-class.sh
# cat > /opt/kafka/config/server.properties <<EOF
broker.id=0
listeners=PLAINTEXT://192.168.12.22:9092
log.dirs=/opt/kafka/logs
zookeeper.connect=192.168.12.24:2181
EOF
# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.propertiesFlink deployment installs Flink, adds a JRE, copies the Kafka and JDBC connectors, and configures flink-conf.yaml before starting the cluster.
# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
# mv flink-1.12.1 /opt/flink
# tar vxzf jre1.8.0_281.tar.gz
# mv jre1.8.0_281 /opt/flink/jre
# cp flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/
# cp flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/
# echo "jobmanager.rpc.address: 192.168.12.23" >> /opt/flink/conf/flink-conf.yaml
# echo "env.java.home: /opt/flink/jre" >> /opt/flink/conf/flink-conf.yaml
# /opt/flink/bin/start-cluster.shMySQL deployment extracts the binary, creates a systemd service, configures my.cnf, initializes the data directory, and creates users for Canal replication.
# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql
# /opt/mysql/bin/mysqld --initialize --user=mysql --console
# systemctl start mysqld
# mysql -uroot -p
mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY 'mysql';
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mysql> FLUSH PRIVILEGES;Canal deployment extracts the package, installs a JRE, updates startup.sh, and configures canal.properties and instance.properties to point to MySQL, Zookeeper, and Kafka.
# tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal
# tar vxzf jre1.8.0_281.tar.gz
# mv jre1.8.0_281 /opt/canal/jre
# echo "JAVA=/opt/canal/jre/bin/java" >> /opt/canal/bin/startup.sh
# cat > /opt/canal/conf/canal.properties <<EOF
canal.zkServers=192.168.12.24:2181
canal.serverMode=kafka
canal.destinations=example
canal.mq.servers=192.168.12.22:9092
EOF
# cat > /opt/canal/conf/example/instance.properties <<EOF
canal.instance.master.address=192.168.12.25:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\..*
canal.mq.topic=canal-kafka
EOF
# /opt/canal/bin/startup.shAfter all components are running, the article demonstrates the data flow:
MySQL generates binlog.
Canal reads the binlog, converts it to Canal‑JSON, and pushes it to Kafka topic canal-kafka.
Flink consumes the topic with the canal-json format, processes the records, and writes them to TiDB via the JDBC connector.
SQL examples show creating the Kafka topic, inserting data into MySQL, and querying the results in Flink and TiDB, confirming the end‑to‑end pipeline works.
Flink SQL example:
CREATE TABLE t2 (
id INT
) WITH (
'connector' = 'kafka',
'topic' = 'canal-kafka',
'properties.bootstrap.servers' = '192.168.12.22:9092',
'properties.group.id' = 'canal-kafka-consumer-group',
'format' = 'canal-json',
'scan.startup.mode' = 'latest-offset'
);
INSERT INTO t2 VALUES (2);
SELECT * FROM t2;Flink to TiDB example:
CREATE TABLE t3 (
id INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.12.21:14000/test',
'table-name' = 't3',
'username' = 'root',
'password' = 'mysql'
);
INSERT INTO t3 VALUES (3);Verification in TiDB:
mysql> SELECT * FROM test.t3;
+------+
| id |
+------+
| 3 |
+------+Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
