Databases 11 min read

Master Real‑Time Change Data Capture with Debezium and Spring Boot

Learn how to capture and stream real‑time database changes using Debezium’s distributed CDC framework, configure MySQL binlog, integrate the embedded engine with Spring Boot, and process change events with sample code and Docker setup for robust data pipelines.

Programmer DD
Programmer DD
Programmer DD
Master Real‑Time Change Data Capture with Debezium and Spring Boot

Database data constantly changes, and developers often need to listen to these changes to update caches, synchronize downstream systems, or perform auditing. This technique is called Change Data Capture (CDC). While Canal is a popular CDC framework limited to MySQL, Debezium offers a more powerful, distributed solution.

Debezium Overview

Debezium, an open‑source project from Red Hat, provides near‑real‑time monitoring of row‑level changes for many databases (MySQL, PostgreSQL, MongoDB, SQL Server, Oracle, DB2, etc.). It captures only committed transactions and presents a unified change event model, simplifying multi‑database integration. Combined with Kafka Connectors, it enables highly fault‑tolerant, scalable event‑driven pipelines.

Debezium Kafka Architecture
Debezium Kafka Architecture

Deploying Debezium Kafka connectors for MySQL and PostgreSQL captures change events and forwards them to downstream systems such as Elasticsearch, data warehouses, or caches.

Spring Boot Integration with Embedded Debezium

The tutorial demonstrates embedding Debezium directly into a Spring Boot application, using the Debezium Engine to run the MySQL connector in an asynchronous thread.

Running MySQL with Binlog Enabled (Docker)

# Run MySQL container
docker run --name mysql-service -v d:/mysql/data:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --default-time_zone="+8:00"
# Enable binlog
docker exec mysql-service bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
# Set server-id
docker exec mysql-service bash -c "echo 'server-id=123454' >> /etc/mysql/mysql.conf.d/mysqld.cnf"

The container runs with user root, password 123456, and mounts data to d:/mysql/data. Binlog is enabled and a unique server-id is set.

Maven Dependencies for Debezium

<dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-api</artifactId>
        <version>${debezium.version}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>${debezium.version}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-mysql</artifactId>
        <version>${debezium.version}</version>
    </dependency>

The current Debezium version is 1.5.2.Final .

Debezium Engine Configuration (Spring Bean)

@Bean
io.debezium.config.Configuration debeziumConfig() {
    return io.debezium.config.Configuration.create()
        .with("connector.class", MySqlConnector.class.getName())
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/offsets.dat")
        .with("offset.flush.interval.ms", "6000")
        .with("name", "mysql-connector")
        .with("database.hostname", "localhost")
        .with("database.port", "3306")
        .with("database.user", "root")
        .with("database.password", "123456")
        .with("database.include.list", "etl")
        .with("include.schema.changes", "false")
        .with("database.server.id", "123454")
        .with("database.server.name", "customer-mysql-db-server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/dbhistory.dat")
        .build();
}

The configuration consists of two parts: Debezium Engine properties and MySQL Connector properties.

Instantiating the Debezium Engine

DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine =
    DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(configuration.asProperties())
        .notifying(this::handlePayload)
        .build();

The handlePayload method processes each change event, filters out read operations, extracts the after/before struct, and prints a simple map representation:

private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
                         DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
    recordChangeEvents.forEach(r -> {
        SourceRecord sourceRecord = r.record();
        Struct change = (Struct) sourceRecord.value();
        if (change != null) {
            Envelope.Operation op = Envelope.Operation.forCode((String) change.get(OPERATION));
            if (op != Envelope.Operation.READ) {
                String record = op == Envelope.Operation.DELETE ? BEFORE : AFTER;
                Struct struct = (Struct) change.get(record);
                Map<String, Object> payload = struct.schema().fields().stream()
                    .map(Field::name)
                    .filter(name -> struct.get(name) != null)
                    .collect(Collectors.toMap(name -> name, struct::get));
                System.out.println("payload = " + payload);
            }
        }
    });
}

Spring Bean Lifecycle Integration

@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {
    private final Executor executor = Executors.newSingleThreadExecutor();
    private DebeziumEngine<?> debeziumEngine;

    @Override
    public void start() { executor.execute(debeziumEngine); }

    @SneakyThrows
    @Override
    public void stop() { debeziumEngine.close(); }

    @Override
    public boolean isRunning() { return false; }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
    }
}

Running the Spring Boot project and performing insert, update, or delete operations on the MySQL table produces console output such as:

payload = {user_id=1123213, username=felord.cn, age=11, gender=0, enabled=1}

This confirms that Debezium successfully captures database changes, which can then be routed to downstream systems for various use cases.

Original source code is available by following the author and replying with the keyword debezium .
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KafkaSpring BootmysqlCDCChange Data CaptureDebezium
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.