How Alibaba Canal Enables Real-Time MySQL Binlog Replication and Incremental Data Sync

Canal, an open‑source Alibaba project, mimics MySQL slave behavior to subscribe to binlog events, parses them, and supports both standalone and ZooKeeper‑coordinated cluster deployments, offering flexible state storage, message processing pipelines, and integration options such as TCP, Kafka, and RocketMQ for real‑time data synchronization.

Ziru Technology
Ziru Technology
Ziru Technology
How Alibaba Canal Enables Real-Time MySQL Binlog Replication and Incremental Data Sync

Background

Alibaba's team needed cross‑data‑center incremental synchronization. Initially they used trigger‑based change capture, then moved to parsing MySQL binary logs, which led to the incremental subscription and consumption service known as Canal.

Principle

Canal follows MySQL master‑slave replication principles: it connects to a MySQL master, receives binlog events, parses them, and provides real‑time data synchronization.

MySQL master‑slave replication

MySQL master writes data changes to the binary log (binlog).

MySQL slave copies the master’s binlog events to its relay log.

Slave replays the relay log to apply the changes to its own data.

Canal working principle

Canal pretends to be a MySQL slave and sends a dump request to the master.

The master pushes binlog data to Canal.

Canal parses the binary log byte stream.

Deployment

Canal can run in standalone mode or in a cluster coordinated by ZooKeeper.

Standalone deployment

State can be stored in memory or on disk by configuring canal.properties:

Memory state

canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
<bean id="metaManager" class="com.alibaba.otter.canal.meta.MemoryMetaManager" />

File state

canal.instance.global.spring.xml = classpath:spring/file-instance.xml
<bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
    <property name="dataDir" value="${canal.file.data.dir:../conf}" />
    <property name="period" value="${canal.file.flush.period:1000}" />
</bean>

Cluster deployment

State is stored in ZooKeeper; key configuration:

canal.instance.global.spring.xml = classpath:spring/default-instance.xml
<bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
    <property name="zooKeeperMetaManager">
        <bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
            <property name="zkClientx" ref="zkClientx" />
        </bean>
    </property>
    <property name="period" value="${canal.zookeeper.flush.period:1000}" />
</bean>

Message processing

Message events

Canal, acting as a slave, receives dump requests, the master pushes binlog events, which Canal parses and processes.

Key concepts:

Server: a Canal runtime instance (one JVM).

Instance: a data queue; one server can host multiple instances.

Instance modules

eventParser – connects to the source, simulates the slave protocol, and parses the protocol.

eventSink – links parser and store, performs filtering, transformation, and distribution.

eventStore – stores data.

metaManager – manages incremental subscription and consumption metadata.

Core code snippets

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
    updateSettings();
    loadBinlogChecksum();
    sendRegisterSlave();
    sendBinlogDump(binlogfilename, binlogPosition);
    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());
    LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
    LogContext context = new LogContext();
    context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
    while (fetcher.fetch()) {
        accumulateReceivedBytes(fetcher.limit());
        LogEvent event = null;
        event = decoder.decode(fetcher, context);
        if (event == null) {
            throw new CanalParseException("parse failed");
        }
        if (!func.sink(event)) {
            break;
        }
        if (event.getSemival() == 1) {
            sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
        }
    }
}
public Entry parse(LogEvent logEvent, boolean isSeek) throws CanalParseException {
    if (logEvent == null || logEvent instanceof UnknownLogEvent) {
        return null;
    }
    int eventType = logEvent.getHeader().getType();
    switch (eventType) {
        case LogEvent.QUERY_EVENT:
            return parseQueryEvent((QueryLogEvent) logEvent, isSeek);
        case LogEvent.XID_EVENT:
            return parseXidEvent((XidLogEvent) logEvent);
        case LogEvent.WRITE_ROWS_EVENT:
        case LogEvent.WRITE_ROWS_EVENT_V1:
            return parseRowsEvent((WriteRowsLogEvent) logEvent);
        case LogEvent.UPDATE_ROWS_EVENT:
        case LogEvent.UPDATE_ROWS_EVENT_V1:
        case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
            return parseRowsEvent((UpdateRowsLogEvent) logEvent);
        case LogEvent.DELETE_ROWS_EVENT:
        case LogEvent.DELETE_ROWS_EVENT_V1:
            return parseRowsEvent((DeleteRowsLogEvent) logEvent);
        // other cases omitted for brevity
        default:
            break;
    }
    return null;
}
protected boolean doSink(List<Event> events) {
    for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
        events = handler.before(events);
    }
    long blockingStart = 0L;
    int fullTimes = 0;
    do {
        if (eventStore.tryPut(events)) {
            if (fullTimes > 0) {
                eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
            }
            for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                events = handler.after(events);
            }
            return true;
        } else {
            if (fullTimes == 0) {
                blockingStart = System.nanoTime();
            }
            applyWait(++fullTimes);
            if (fullTimes % 100 == 0) {
                long nextStart = System.nanoTime();
                eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
                blockingStart = nextStart;
            }
        }
        for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
            events = handler.retry(events);
        }
    } while (running && !Thread.interrupted());
    return false;
}
private void doPut(List<Event> data) {
    long current = putSequence.get();
    long end = current + data.size();
    for (long next = current + 1; next <= end; next++) {
        entries[getIndex(next)] = data.get((int) (next - current - 1));
    }
    putSequence.set(end);
    if (batchMode.isMemSize()) {
        long size = 0;
        for (Event event : data) {
            size += calculateSize(event);
        }
        putMemSize.addAndGet(size);
    }
    profiling(data, OP.PUT);
    notEmpty.signal();
}
while (running && destinationRunning.get()) {
    Message message;
    if (getTimeout != null && getTimeout > 0) {
        message = canalServer.getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS);
    } else {
        message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
    }
    long batchId = message.getId();
    try {
        int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
        if (batchId != -1 && size != 0) {
            canalMQProducer.send(canalDestination, message, new CanalMQProducer.Callback() {
                @Override
                public void commit() {
                    canalServer.ack(clientIdentity, batchId);
                }
                @Override
                public void rollback() {
                    canalServer.rollback(clientIdentity, batchId);
                }
            });
        } else {
            Thread.sleep(100);
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}

Message consumption options

TCP subscription from canal.server.

Send to Kafka.

Send to RocketMQ.

Configuration example:

# tcp, kafka, RocketMQ
canal.serverMode = tcp

Built‑in adapters

logger

elasticsearch

hbase

rdb

Conclusion

Canal’s processing flow is complex; this article outlines its core principles, internal parsing pipeline, and typical consumption paths, providing a practical overview that can help users troubleshoot and understand Canal during implementation.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaMySQLbinlogdata replicationCanalreal-time-sync
Ziru Technology
Written by

Ziru Technology

Ziru Official Tech Account

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.