Databases 22 min read

Integrating Debezium for Change Data Capture in Spring Boot Applications

This article explains how to use Debezium's change data capture (CDC) capabilities to monitor MySQL binlog events, compares Canal and Debezium, outlines typical CDC use cases, and provides a complete Spring Boot integration guide with configuration, code examples, and testing procedures.

Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Integrating Debezium for Change Data Capture in Spring Boot Applications

Applying Ockham's razor to small projects, the author argues that heavyweight message middleware like Kafka is unnecessary for simple asynchronous decoupling, and introduces Change Data Capture (CDC) as a lightweight alternative that monitors MySQL binlog without adding extra components.

The article reviews common CDC frameworks, focusing on Canal and Debezium, and explains why Debezium is preferred: it avoids installing additional services, supports multiple databases, can embed a Kafka connector for at‑least‑once semantics, and can run in an embedded mode without a separate Kafka cluster.

Typical CDC application scenarios are listed, including cache invalidation, simplifying monolithic applications, shared databases, data integration, and CQRS, highlighting how Debezium can replace dual‑writes and provide reliable, ordered change streams.

Spring Boot Integration

Dependencies

<debezium.version>1.7.0.Final</debezium.version>
<mysql.connector.version>8.0.26</mysql.connector.version>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.connector.version}</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${debezium.version}</version>
    <exclusions>
        <exclusion>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Key configuration properties such as connector.class , offset.storage , offset.storage.file.filename , snapshot.mode , and database.server.id are explained, with examples of file‑based offset storage to avoid requiring Kafka.

Configuration Class (Java)

import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.Data;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.File;
import java.io.IOException;

@Configuration
@ConfigurationProperties(prefix ="debezium.datasource")
@Data
public class MysqlBinlogConfig {
    private String hostname;
    private String port;
    private String user;
    private String password;
    private String tableWhitelist;
    private String storageFile;
    private String historyFile;
    private Long flushInterval;
    private String serverId;
    private String serverName;

    @Bean
    public io.debezium.config.Configuration mysqlBinlogConfig() throws Exception {
        checkFile();
        return io.debezium.config.Configuration.create()
                .with("name", "mysql_connector")
                .with("connector.class", MySqlConnector.class)
                .with("offset.storage", FileOffsetBackingStore.class)
                .with("offset.storage.file.filename", storageFile)
                .with("offset.flush.interval.ms", flushInterval)
                .with("database.history", FileDatabaseHistory.class.getName())
                .with("database.history.file.filename", historyFile)
                .with("snapshot.mode", "Schema_only")
                .with("database.server.id", serverId)
                .with("database.server.name", serverName)
                .with("database.hostname", hostname)
                .with("database.port", port)
                .with("database.user", user)
                .with("database.password", password)
                .with("table.whitelist", tableWhitelist)
                .build();
    }

    private void checkFile() throws IOException {
        String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
        File dirFile = new File(dir);
        if (!dirFile.exists()) {
            dirFile.mkdirs();
        }
        File file = new File(storageFile);
        if (!file.exists()) {
            file.createNewFile();
        }
    }
}

The listener component consumes Debezium change events, extracts the payload, filters out read operations, and forwards the data to a service for further processing. The listener is started and stopped with Spring lifecycle callbacks.

import com.alibaba.fastjson.JSON;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;

@Component
@Slf4j
public class MysqlBinlogListener {
    @Resource
    private Executor taskExecutor;
    private final List
>> engineList = new ArrayList<>();
    private MysqlBinlogListener(@Qualifier("mysqlConnector") Configuration configuration) {
        engineList.add(DebeziumEngine.create(Json.class)
                .using(configuration.asProperties())
                .notifying(record -> receiveChangeEvent(record.value()))
                .build());
    }
    private void receiveChangeEvent(String value) {
        if (Objects.nonNull(value)) {
            Map
payload = getPayload(value);
            String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
            if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
                ChangeData changeData = getChangeData(payload);
                try {
                    mysqlBinlogService.service(changeData);
                } catch (Exception e) {
                    log.error("binlog处理异常,原数据: " + changeData, e);
                }
            }
        }
    }
    @PostConstruct
    private void start() {
        for (DebeziumEngine
> engine : engineList) {
            taskExecutor.execute(engine);
        }
    }
    @PreDestroy
    private void stop() {
        for (DebeziumEngine
> engine : engineList) {
            if (engine != null) {
                try { engine.close(); } catch (IOException e) { log.error("", e); }
            }
        }
    }
    public static Map
getPayload(String value) {
        Map
map = JSON.parseObject(value, Map.class);
        return JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
    }
    public static ChangeData getChangeData(Map
payload) {
        Map
source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
        return ChangeData.builder()
                .op(payload.get("op").toString())
                .table(source.get("table").toString())
                .after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
                .source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
                .before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
                .build();
    }
    @Data
    @Builder
    public static class ChangeData {
        private Map
after;
        private Map
source;
        private Map
before;
        private String table;
        private String op;
    }
}

Sample log outputs for UPDATE, INSERT, and DELETE events are shown, demonstrating the structure of the ChangeData object. The article also notes that the offset file records the last processed binlog position, enabling reliable restarts.

In conclusion, Debezium provides a flexible CDC solution that can replace lightweight messaging for certain use cases, though it does not fully substitute a dedicated message queue; developers should evaluate requirements before choosing.

KafkaSpring BootMySQLCDCChange Data CaptureDebezium
Rare Earth Juejin Tech Community
Written by

Rare Earth Juejin Tech Community

Juejin, a tech community that helps developers grow.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.