Build Real-Time MySQL CDC Pipelines with Flink 1.19 and SpringBoot
This guide walks through setting up Flink CDC with MySQL on SpringBoot 2.7, covering binlog configuration, Maven dependencies, Java implementation for real‑time change capture, startup options, a custom Redis sink, and a web UI for monitoring the streaming pipeline.
1. Introduction
Flink CDC (Change Data Capture) is a CDC framework based on database logs that integrates with the Flink compute engine to provide efficient real‑time data integration for massive data streams. It monitors database changes, extracts them, and makes them available for further processing and analysis.
Supported Databases
MySQL 5.6, 5.7, 8.0.x
RDS MySQL 5.6, 5.7, 8.0.x
PolarDB MySQL 5.6, 5.7, 8.0.x
Aurora MySQL 5.6, 5.7, 8.0.x
MariaDB 10.x
PolarDB X 2.0.1
2. Practical Example
2.1 Enable MySQL Binlog
<code>[mysqld]
server-id=1
# row‑level format
binlog-format=Row
# binlog file prefix
log-bin=mysql-bin
# databases to capture
binlog_do_db=testjpa</code>In addition to enabling binlog, the Flink CDC user must have SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SHOW VIEW, etc., privileges to read data and metadata.
2.2 Dependency Management
<code><properties>
<flink.version>1.19.0</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>
... (additional Flink dependencies) ...</code>2.3 Code Implementation
<code>@Component
public class MonitorMySQLCDC implements InitializingBean {
public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>();
private final StringRedisTemplate stringRedisTemplate;
private final String PREFIX = "users:";
private final CustomSink customSink;
public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {
this.customSink = customSink;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public void afterPropertiesSet() throws Exception {
new Thread(() -> {
while (true) {
try {
Map<String, Object> result = queue.take();
this.doAction(result);
} catch (Exception e) { e.printStackTrace(); }
}
}).start();
Properties jdbcProperties = new Properties();
jdbcProperties.setProperty("useSSL", "false");
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(3306)
.databaseList("testjpa")
.tableList("testjpa.users")
.username("root")
.password("123123")
.jdbcProperties(jdbcProperties)
.includeSchemaChanges(true)
.deserializer(new JsonDebeziumDeserializationSchema(true))
.startupOptions(StartupOptions.initial())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(6000);
env.setParallelism(4);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")
.addSink(this.customSink);
env.execute();
}
@SuppressWarnings("unchecked")
private void doAction(Map<String, Object> result) throws Exception {
Map<String, Object> payload = (Map<String, Object>) result.get("payload");
String op = (String) payload.get("op");
switch (op) {
case "u", "c" -> {
Map<String, Object> after = (Map<String, Object>) payload.get("after");
String id = after.get("id").toString();
System.out.printf("操作:%s, ID: %s%n", op, id);
stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after));
}
case "d" -> {
Map<String, Object> before = (Map<String, Object>) payload.get("before");
String id = before.get("id").toString();
stringRedisTemplate.delete(PREFIX + id);
}
}
}
}
</code>Startup Modes
initial (default): take an initial snapshot on first start, then continue reading the latest binlog.
earliest-offset : skip snapshot, start from the earliest available binlog position.
latest-offset : do not snapshot; start reading from the end of the binlog, capturing only changes after the connector starts.
specific-offset : skip snapshot and start from a user‑specified binlog file and position (or GTID set).
timestamp : skip snapshot and start from a given timestamp.
Data Processing Sink
<code>@Component
public class CustomSink extends RichSinkFunction<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void invoke(String value, Context context) throws Exception {
System.out.printf("数据发生变化: %s%n", value);
TypeReference<Map<String, Object>> typeRef = new TypeReference<>() {};
Map<String, Object> result = mapper.readValue(value, typeRef);
Map<String, Object> payload = (Map<String, Object>) result.get("payload");
String op = (String) payload.get("op");
if (!"r".equals(op)) {
MonitorMySQLCDC.queue.put(result);
}
}
}
</code>Web Monitoring Page
Add the Flink web runtime dependency and configure the REST port.
<code><dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
</code> <code>Configuration config = new Configuration();
config.set(RestOptions.PORT, 9090);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
</code>The web UI listens on port 9090.
This article has covered the complete implementation of real‑time MySQL CDC to Redis using Flink.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.