How Alibaba Canal Enables Real-Time MySQL Binlog Replication and Incremental Data Sync
Canal, an open‑source Alibaba project, mimics MySQL slave behavior to subscribe to binlog events, parses them, and supports both standalone and ZooKeeper‑coordinated cluster deployments, offering flexible state storage, message processing pipelines, and integration options such as TCP, Kafka, and RocketMQ for real‑time data synchronization.
Background
Alibaba's team needed cross‑data‑center incremental synchronization. Initially they used trigger‑based change capture, then moved to parsing MySQL binary logs, which led to the incremental subscription and consumption service known as Canal.
Principle
Canal follows MySQL master‑slave replication principles: it connects to a MySQL master, receives binlog events, parses them, and provides real‑time data synchronization.
MySQL master‑slave replication
MySQL master writes data changes to the binary log (binlog).
MySQL slave copies the master’s binlog events to its relay log.
Slave replays the relay log to apply the changes to its own data.
Canal working principle
Canal pretends to be a MySQL slave and sends a dump request to the master.
The master pushes binlog data to Canal.
Canal parses the binary log byte stream.
Deployment
Canal can run in standalone mode or in a cluster coordinated by ZooKeeper.
Standalone deployment
State can be stored in memory or on disk by configuring canal.properties:
Memory state
canal.instance.global.spring.xml = classpath:spring/memory-instance.xml <bean id="metaManager" class="com.alibaba.otter.canal.meta.MemoryMetaManager" />File state
canal.instance.global.spring.xml = classpath:spring/file-instance.xml <bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
<property name="dataDir" value="${canal.file.data.dir:../conf}" />
<property name="period" value="${canal.file.flush.period:1000}" />
</bean>Cluster deployment
State is stored in ZooKeeper; key configuration:
canal.instance.global.spring.xml = classpath:spring/default-instance.xml <bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
<property name="zooKeeperMetaManager">
<bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
<property name="zkClientx" ref="zkClientx" />
</bean>
</property>
<property name="period" value="${canal.zookeeper.flush.period:1000}" />
</bean>Message processing
Message events
Canal, acting as a slave, receives dump requests, the master pushes binlog events, which Canal parses and processes.
Key concepts:
Server: a Canal runtime instance (one JVM).
Instance: a data queue; one server can host multiple instances.
Instance modules
eventParser – connects to the source, simulates the slave protocol, and parses the protocol.
eventSink – links parser and store, performs filtering, transformation, and distribution.
eventStore – stores data.
metaManager – manages incremental subscription and consumption metadata.
Core code snippets
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
updateSettings();
loadBinlogChecksum();
sendRegisterSlave();
sendBinlogDump(binlogfilename, binlogPosition);
DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
fetcher.start(connector.getChannel());
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
while (fetcher.fetch()) {
accumulateReceivedBytes(fetcher.limit());
LogEvent event = null;
event = decoder.decode(fetcher, context);
if (event == null) {
throw new CanalParseException("parse failed");
}
if (!func.sink(event)) {
break;
}
if (event.getSemival() == 1) {
sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
}
}
} public Entry parse(LogEvent logEvent, boolean isSeek) throws CanalParseException {
if (logEvent == null || logEvent instanceof UnknownLogEvent) {
return null;
}
int eventType = logEvent.getHeader().getType();
switch (eventType) {
case LogEvent.QUERY_EVENT:
return parseQueryEvent((QueryLogEvent) logEvent, isSeek);
case LogEvent.XID_EVENT:
return parseXidEvent((XidLogEvent) logEvent);
case LogEvent.WRITE_ROWS_EVENT:
case LogEvent.WRITE_ROWS_EVENT_V1:
return parseRowsEvent((WriteRowsLogEvent) logEvent);
case LogEvent.UPDATE_ROWS_EVENT:
case LogEvent.UPDATE_ROWS_EVENT_V1:
case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
return parseRowsEvent((UpdateRowsLogEvent) logEvent);
case LogEvent.DELETE_ROWS_EVENT:
case LogEvent.DELETE_ROWS_EVENT_V1:
return parseRowsEvent((DeleteRowsLogEvent) logEvent);
// other cases omitted for brevity
default:
break;
}
return null;
} protected boolean doSink(List<Event> events) {
for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.before(events);
}
long blockingStart = 0L;
int fullTimes = 0;
do {
if (eventStore.tryPut(events)) {
if (fullTimes > 0) {
eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
}
for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.after(events);
}
return true;
} else {
if (fullTimes == 0) {
blockingStart = System.nanoTime();
}
applyWait(++fullTimes);
if (fullTimes % 100 == 0) {
long nextStart = System.nanoTime();
eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
blockingStart = nextStart;
}
}
for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.retry(events);
}
} while (running && !Thread.interrupted());
return false;
} private void doPut(List<Event> data) {
long current = putSequence.get();
long end = current + data.size();
for (long next = current + 1; next <= end; next++) {
entries[getIndex(next)] = data.get((int) (next - current - 1));
}
putSequence.set(end);
if (batchMode.isMemSize()) {
long size = 0;
for (Event event : data) {
size += calculateSize(event);
}
putMemSize.addAndGet(size);
}
profiling(data, OP.PUT);
notEmpty.signal();
} while (running && destinationRunning.get()) {
Message message;
if (getTimeout != null && getTimeout > 0) {
message = canalServer.getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS);
} else {
message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
}
long batchId = message.getId();
try {
int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
if (batchId != -1 && size != 0) {
canalMQProducer.send(canalDestination, message, new CanalMQProducer.Callback() {
@Override
public void commit() {
canalServer.ack(clientIdentity, batchId);
}
@Override
public void rollback() {
canalServer.rollback(clientIdentity, batchId);
}
});
} else {
Thread.sleep(100);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}Message consumption options
TCP subscription from canal.server.
Send to Kafka.
Send to RocketMQ.
Configuration example:
# tcp, kafka, RocketMQ
canal.serverMode = tcpBuilt‑in adapters
logger
elasticsearch
hbase
rdb
Conclusion
Canal’s processing flow is complex; this article outlines its core principles, internal parsing pipeline, and typical consumption paths, providing a practical overview that can help users troubleshoot and understand Canal during implementation.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
