Introduction to Alibaba Canal: MySQL Binlog Parsing and Java Client Implementation
This article introduces Alibaba's Canal project, explaining its background, architecture, and how it parses MySQL binlog for real‑time data subscription, and provides a complete Java client example with code snippets to demonstrate connecting, fetching, and processing binlog events.
Background: Alibaba's B2B platform needed cross‑region data synchronization, initially using triggers and later adopting log‑based incremental subscription, which gave rise to the Canal project.
Project Overview: Canal (pronounced “kə’næl”) is a pure‑Java tool that parses MySQL binary logs to provide real‑time data subscription and consumption, supporting MySQL 5.1‑5.7 versions.
Key Use Cases: database mirroring, real‑time backup, multi‑level indexing, search build, cache refresh, and price‑change notifications.
Canal Working Principle (MySQL replication): 1. The master writes changes to the binary log (binlog). 2. The slave copies the binlog events to its relay log. 3. The slave replays the relay log to apply changes.
Canal Client Mechanism: - Simulates a MySQL slave, sends a dump request to the master, receives the binlog stream, and parses the binary data.
Example Java Client (dependencies and code):
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String[] args) {
// Create connection
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try { Thread.sleep(1000); } catch (InterruptedException e) {}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId);
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try { rowChage = RowChange.parseFrom(entry.getStoreValue()); }
catch (Exception e) { throw new RuntimeException("ERROR parsing entry", e); }
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}Explanation of key classes: Message contains an ID and a list of CanalEntry.Entry objects; each entry holds header information (log file name, offset, schema, table, event type) and a serialized RowChange payload. RowChange provides DDL flag, SQL, and before/after column lists for INSERT, UPDATE, DELETE events.
Running the client prints binlog metadata and column values, allowing developers to observe real‑time changes by modifying the source database.
The article concludes with a reminder to like, share, and follow the author’s WeChat public accounts for more big‑data content.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
