Databases 24 min read

How to Use Debezium for MySQL CDC in Spring Boot Without Adding Extra Middleware

Learn how to capture MySQL data changes using Debezium's CDC capabilities within a Spring Boot application, avoiding heavyweight message brokers by leveraging binlog monitoring, configuring connectors, handling snapshots, and processing change events for use cases like cache invalidation, data integration, and simplifying monolithic architectures.

Java High-Performance Architecture
Java High-Performance Architecture
Java High-Performance Architecture
How to Use Debezium for MySQL CDC in Spring Boot Without Adding Extra Middleware

Ockham's razor principle, "If it isn't necessary, do not add extra entities," guides the discussion of handling asynchronous decoupling in small projects without introducing heavyweight message middleware.

When a web project runs on an internal network and the company standardizes on Kafka, using Kafka or even lighter alternatives like RocketMQ or RabbitMQ can feel excessive for simple use cases. Traditional triggers are considered outdated, and multithreading or events may not fit.

Source: juejin.cn/post/7264791359839223823

Change Data Capture (CDC) monitors MySQL binary logs (binlog) to detect row‑level changes as they are committed, allowing applications to react without modifying existing code. This approach provides a lightweight, decoupled way to handle tasks such as cache invalidation, data synchronization, and event‑driven processing.

Common CDC Frameworks

Typical CDC solutions include:

Canal – parses MySQL binlog and provides incremental data subscription.

Debezium – an open‑source CDC platform that uses Kafka and Kafka Connect for durability, reliability, and fault tolerance.

Other tools such as Databus, Maxwell, and Flink CDC belong to the big‑data ecosystem.

Why choose Debezium over Canal?

Canal requires installation, violating the "no unnecessary entities" principle.

Canal only supports MySQL, limiting its applicability.

Flink CDC, popular in big‑data, is built on Debezium rather than Canal.

Debezium can stream changes to Kafka topics, reducing database load and supporting exactly‑once or at‑least‑once delivery semantics.

Debezium offers an embedded mode that runs without a separate Kafka cluster.

Debezium Overview

Debezium captures every committed row‑level change from supported databases, providing a unified event model. It persists change events in Kafka, ensuring ordered delivery per partition and allowing multiple consumers to process the same events without overloading the source database.

Typical use cases include:

Cache invalidation – keeping Redis or other caches in sync with MySQL.

Simplifying monolithic applications – moving side‑effects (search index updates, notifications, etc.) to asynchronous consumers.

Sharing databases – multiple services react to changes without direct coupling.

Data integration – lightweight ETL by consuming change events.

CQRS – separating write and read models while maintaining consistency.

Spring Boot Integration

Add the following Maven dependencies (replace version placeholders as needed):

<debezium.version>1.7.0.Final</debezium.version>
<mysql.connector.version>8.0.26</mysql.connector.version>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.connector.version}</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${debezium.version}</version>
    <exclusions>
        <exclusion>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Key configuration properties (placed in application.properties or application.yml) include: connector.class – set to io.debezium.connector.mysql.MySqlConnector. offset.storage – use FileOffsetBackingStore for local offset files when Kafka is not used. offset.storage.file.filename – path to the local offset file. offset.flush.interval.ms – frequency of offset persistence (default 1 minute). table.whitelist – list of tables to monitor. database.whitelist – list of databases to monitor (overrides table whitelist if set).

debezium.datasource.hostname=localhost
debezium.datasource.port=3306
debezium.datasource.user=root
debezium.datasource.password=123456
debezium.datasource.tableWhitelist=test.test
debezium.datasource.storageFile=E:/debezium/test/offsets/offset.dat
debezium.datasource.historyFile=E:/debezium/test/history/custom-file-db-history.dat
debezium.datasource.flushInterval=10000
debezium.datasource.serverId=1
debezium.datasource.serverName=name-1

Configuration class (Spring @Configuration) builds a Debezium Configuration object and checks that the offset and history files exist:

import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.Data;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.File;
import java.io.IOException;

@Configuration
@ConfigurationProperties(prefix = "debezium.datasource")
@Data
public class MysqlBinlogConfig {
    private String hostname;
    private String port;
    private String user;
    private String password;
    private String tableWhitelist;
    private String storageFile;
    private String historyFile;
    private Long flushInterval;
    private String serverId;
    private String serverName;

    @Bean
    public io.debezium.config.Configuration mysqlBinlogConfig() throws Exception {
        checkFile();
        return io.debezium.config.Configuration.create()
            .with("name", "mysql_connector")
            .with("connector.class", MySqlConnector.class)
            .with("offset.storage", FileOffsetBackingStore.class)
            .with("offset.storage.file.filename", storageFile)
            .with("offset.flush.interval.ms", flushInterval)
            .with("database.history", FileDatabaseHistory.class.getName())
            .with("database.history.file.filename", historyFile)
            .with("snapshot.mode", "Schema_only")
            .with("database.server.id", serverId)
            .with("database.server.name", serverName)
            .with("database.hostname", hostname)
            .with("database.port", port)
            .with("database.user", user)
            .with("database.password", password)
            .with("table.whitelist", tableWhitelist)
            .build();
    }

    private void checkFile() throws IOException {
        String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
        File dirFile = new File(dir);
        if (!dirFile.exists()) {
            dirFile.mkdirs();
        }
        File file = new File(storageFile);
        if (!file.exists()) {
            file.createNewFile();
        }
    }
}

Snapshot mode determines when the connector takes an initial snapshot. Options include initial, when_needed, never, schema_only, and schema_only_recovery.

Listener Implementation

The following component starts a Debezium engine, receives JSON change events, parses them, and forwards them to a service for business handling:

import com.alibaba.fastjson.JSON;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;

@Component
@Slf4j
public class MysqlBinlogListener {
    @Resource
    private Executor taskExecutor;
    private final List<DebeziumEngine<ChangeEvent<String, String>>> engineList = new ArrayList<>();

    private MysqlBinlogListener(@Qualifier("mysqlConnector") Configuration configuration) {
        engineList.add(DebeziumEngine.create(Json.class)
            .using(configuration.asProperties())
            .notifying(record -> receiveChangeEvent(record.value()))
            .build());
    }

    private void receiveChangeEvent(String value) {
        if (Objects.nonNull(value)) {
            Map<String, Object> payload = getPayload(value);
            String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
            if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
                ChangeData changeData = getChangeData(payload);
                try {
                    mysqlBinlogService.service(changeData);
                } catch (Exception e) {
                    log.error("binlog processing error, original data: " + changeData, e);
                }
            }
        }
    }

    @PostConstruct
    private void start() {
        for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
            taskExecutor.execute(engine);
        }
    }

    @PreDestroy
    private void stop() {
        for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
            if (engine != null) {
                try {
                    engine.close();
                } catch (IOException e) {
                    log.error("", e);
                }
            }
        }
    }

    public static Map<String, Object> getPayload(String value) {
        Map<String, Object> map = JSON.parseObject(value, Map.class);
        return JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
    }

    public static ChangeData getChangeData(Map<String, Object> payload) {
        Map<String, Object> source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
        return ChangeData.builder()
            .op(payload.get("op").toString())
            .table(source.get("table").toString())
            .after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
            .source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
            .before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
            .build();
    }

    @Data
    @Builder
    public static class ChangeData {
        private Map<String, Object> after;
        private Map<String, Object> source;
        private Map<String, Object> before;
        private String table;
        private String op;
    }
}

Sample logs for UPDATE, INSERT, and DELETE operations demonstrate the structure of the ChangeData object, showing before/after states and metadata such as binlog file, position, and server ID.

The offset file ( storageFile) records the current binlog position, similar to Kafka offsets. When the service restarts, Debezium resumes from the stored position, ensuring no data loss.

Conclusion

Debezium offers a low‑latency CDC platform that can replace lightweight messaging needs in small projects, providing reliable change streams without adding heavyweight infrastructure. However, it does not fully replace dedicated message queues for high‑throughput or complex routing scenarios, so choose the tool that best fits your architectural requirements.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KafkaSpring BootmysqlData IntegrationCDCDebezium
Java High-Performance Architecture
Written by

Java High-Performance Architecture

Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.