Backend Development 21 min read

Integrating Debezium for Change Data Capture in Spring Boot Applications

This article explains how to use CDC technology, particularly Debezium, to capture MySQL binlog changes and process them in a Spring Boot application without adding heavyweight middleware, providing code examples, configuration details, and typical use cases.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Integrating Debezium for Change Data Capture in Spring Boot Applications

In small web projects where introducing a message broker is undesirable, developers often need a way to decouple asynchronous business logic; Change Data Capture (CDC) offers a solution by monitoring database changes directly.

CDC works by reading the database's transaction log (e.g., MySQL binlog) and emitting events whenever rows are inserted, updated, or deleted, allowing downstream processing without modifying existing application code.

Common CDC frameworks include Canal, Debezium, Maxwell, and Flink CDC. Among them, Debezium stands out for its lightweight deployment, support for multiple databases, and seamless integration with Kafka or an embedded mode.

Why choose Debezium over Canal? It does not require a separate installation, supports more than just MySQL, can embed a Kafka Connect engine to avoid managing a Kafka cluster, and provides exactly‑once or at‑least‑once delivery semantics.

Debezium is an open‑source platform that captures row‑level changes, stores them in durable Kafka topics, and guarantees ordered delivery per partition, making it suitable for cache invalidation, simplifying monolithic applications, sharing databases, data integration, and CQRS patterns.

Spring Boot integration

Dependencies (add to pom.xml ):

<debezium.version>1.7.0.Final</debezium.version>
<mysql.connector.version>8.0.26</mysql.connector.version>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.connector.version}</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${debezium.version}</version>
    <exclusions>
        <exclusion>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Configuration properties (application.yml or .properties):

debezium.datasource.hostname = localhost
debezium.datasource.port = 3306
debezium.datasource.user = root
debezium.datasource.password = 123456
debezium.datasource.tableWhitelist = test.test
debezium.datasource.storageFile = E:/debezium/test/offsets/offset.dat
debezium.datasource.historyFile = E:/debezium/test/history/custom-file-db-history.dat
debezium.datasource.flushInterval = 10000
debezium.datasource.serverId = 1
debezium.datasource.serverName = name-1

Key configuration items:

connector.class : set to io.debezium.connector.mysql.MySqlConnector to monitor MySQL.

offset.storage : choose FileOffsetBackingStore for local offset files (or KafkaOffsetBackingStore when using Kafka).

offset.storage.file.filename : path to the local offset file.

offset.flush.interval.ms : frequency of persisting offsets; default is 1 minute.

table.whitelist / database.whitelist : limit CDC to specific tables or databases.

snapshot.mode : controls when an initial snapshot is taken (e.g., initial , when_needed , never , schema_only ).

database.server.id : unique ID for the Debezium slave; must not clash with other connectors.

Listener implementation (Spring component that receives change events):

/**
 * @projectName test
 * @package com.test.config
 * @className MysqlBinlogListener
 */
@Component
@Slf4j
public class MysqlBinlogListener {

    @Resource
    private Executor taskExecutor;

    private final List
>> engineList = new ArrayList<>();

    private MysqlBinlogListener(@Qualifier("mysqlConnector") Configuration configuration) {
        this.engineList.add(DebeziumEngine.create(Json.class)
            .using(configuration.asProperties())
            .notifying(record -> receiveChangeEvent(record.value()))
            .build());
    }

    private void receiveChangeEvent(String value) {
        if (Objects.nonNull(value)) {
            Map
payload = getPayload(value);
            String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
            if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
                ChangeData changeData = getChangeData(payload);
                log.info("changeData = " + changeData);
            }
        }
    }

    @PostConstruct
    private void start() {
        for (DebeziumEngine
> engine : engineList) {
            taskExecutor.execute(engine);
        }
    }

    @PreDestroy
    private void stop() {
        for (DebeziumEngine
> engine : engineList) {
            if (engine != null) {
                try {
                    engine.close();
                } catch (IOException e) {
                    log.error("", e);
                }
            }
        }
    }

    public static Map
getPayload(String value) {
        Map
map = JSON.parseObject(value, Map.class);
        return JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
    }

    public static ChangeData getChangeData(Map
payload) {
        Map
source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
        return ChangeData.builder()
            .op(payload.get("op").toString())
            .table(source.get("table").toString())
            .after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
            .source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
            .before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
            .build();
    }

    @Data
    @Builder
    public static class ChangeData {
        private Map
after;
        private Map
source;
        private Map
before;
        private String table;
        private String op;
    }
}

Test output examples:

MysqlListener.ChangeData(after ={name=Suzuki Mio2, id=1}, source ={file=binlog.000013, connector=mysql, pos=42587833, name=test-1, row=0, server_id=1, version=1.7.0.Final, ts_ms=1691458956000, snapshot=false, db=test, table=test}, before ={name=Suzuki Mio, id=1}, table=test, op=u)
MysqlListener.ChangeData(after ={name=王五, id=0}, source ={file=binlog.000013, connector=mysql, pos=42588175, name=test-1, row=0, server_id=1, version=1.7.0.Final, ts_ms=1691459066000, snapshot=false, db=test, table=test}, before =null, table=test, op=c)
MysqlListener.ChangeData(after =null, source ={file=binlog.000013, connector=mysql, pos=42588959, name=test-1, row=0, server_id=1, version=1.7.0.Final, ts_ms=1691459104000, snapshot=false, db=test, table=test}, before ={name=王五, id=0}, table=test, op=d)

The offset file (e.g., E:/debezium/test/offsets/offset.dat ) records the last processed binlog position, similar to Kafka offsets; on restart, Debezium resumes from that position.

Conclusion : Debezium provides a low‑latency CDC solution that can replace lightweight message‑queue use cases in small projects, though it does not fully substitute a full‑featured MQ. Use it judiciously based on project requirements.

Spring BootMySQLCDCChange Data CaptureDebezium
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.