Big Data 14 min read

Implementing CDC‑Based Data Warehouse Synchronization with Canal and Camus

This article explains how to replace daily offline MySQL‑to‑Hive sync with a CDC pipeline using Alibaba’s Canal to capture binlog events, Kafka for transport, and LinkedIn’s Camus (via a custom writer) to load data into Hive, detailing configuration and deployment steps.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Implementing CDC‑Based Data Warehouse Synchronization with Canal and Camus

For a long time our data warehouse relied on daily offline synchronization using Sqoop or DataX to pull full or incremental data from MySQL tables into Hive. While easy to start, this approach suffers from slow queries, growing latency as data volume increases, and inability to capture deletes or updates without timestamps.

To overcome these drawbacks we adopted a Change Data Capture (CDC) strategy in three steps: (1) subscribe to MySQL binlog and store it in a temporary table, (2) take a one‑time snapshot of the required tables and load the existing data into Hive, and (3) continuously merge snapshot data with binlog changes to keep Hive identical to the source.

The first step is implemented with Alibaba’s open‑source Canal to read MySQL binlog and forward events to Kafka, and with LinkedIn’s Camus (now part of Gobblin) to consume Kafka topics and write the data into Hive.

Canal Configuration

We use Canal 1.1.3 (Kafka client 1.0.1). Key settings are shown below.

# High‑availability Zookeeper nodes
canal.zkServers = 10.10.99.130:2181,10.10.99.132:2181,10.10.99.133:2181,10.10.99.124:2181,10.10.99.125:2181
# Binlog format must be ROW
canal.instance.binlog.format = ROW
# Filter DCL/DML/DDL as needed
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
# Enable automatic instance scanning every 60 s
canal.auto.scan = true
canal.auto.scan.interval = 60
# Send data to Kafka
canal.serverMode = kafka
canal.mq.servers = 10.10.99.132:9092,10.10.99.133:9092,10.10.99.134:9092,10.10.99.135:9092
canal.mq.retries = 2
canal.mq.batchSize = 32768
canal.mq.maxRequestSize = 2097152
canal.mq.lingerMs = 150
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 200
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
canal.mq.transaction = false

instance.properties

We store each table’s binlog in a separate Kafka topic (single partition per topic) to preserve order.

# Tables to capture (comma‑separated regex)
canal.instance.filter.regex=mall\.address,mall\.base_category,mall\.orders,mall\.order_product,mall\.product,mall\.mall_category,mall\.mall_comment
# No blacklist in this example
canal.instance.filter.black.regex=
# Multi‑topic mapping
canal.mq.dynamicTopic=bl_mall_address:mall\.address,bl_mall_base_category:mall\.base_category,bl_mall_orders:mall\.orders,bl_mall_order_product:mall\.order_product,bl_mall_product:mall\.product,bl_mall_mall_category:mall\.mall_category,bl_mall_mall_comment:mall\.mall_comment
canal.mq.partition=0

Camus Configuration

Camus (maintained by Confluent) reads the Kafka topics and writes the data to HDFS/Hive. Important properties are listed below.

# Kafka brokers
kafka.brokers=10.10.99.132:9092,10.10.99.133:9092,10.10.99.134:9092,10.10.99.135:9092
# Job name
camus.job.name=binlog-fetch
# Destination in HDFS (Hive warehouse)
etl.destination.path=/user/hive/warehouse/binlog.db
# Execution metadata paths
etl.execution.base.path=/camus/exec
etl.execution.history.path=/camus/exec/history
# HDFS default FS
fs.default.name=hdfs://mycluster
# Decoder for JSON binlog messages
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
# Custom writer to produce tab‑separated text suitable for Hive
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.CanalBinlogRecordWriterProvider
# Timestamp field and format for partitioning
camus.message.timestamp.field=es
camus.message.timestamp.format=unix_milliseconds
# Time‑based partitioner (hourly)
etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioner
etl.destination.path.topic.sub.dirformat='pt_hour'=YYYYMMddHH
# Mapper count
mapred.map.tasks=20
# Pull limits (no limit)
kafka.max.pull.hrs=-1
kafka.max.historical.days=3
kafka.max.pull.minutes.per.task=-1
# Topic whitelist/blacklist
kafka.blacklist.topics=__consumer_offsets,binlog_dym_test,binlog_mall_test,test010802,test_kylin_streaming2,user_persona4scheduler,HbaseRequestsPerSecond
kafka.whitelist.topics=
# Output compression (disabled)
mapred.output.compress=false
# Timezone and partition interval
etl.default.timezone=Asia/Shanghai
etl.output.file.time.partition.mins=60

Custom Binlog Writer (Java)

public class CanalBinlogRecordWriterProvider implements RecordWriterProvider {
    static class CanalBinlogRecordWriter extends RecordWriter<IEtlKey, CamusWrapper> {
        private DataOutputStream outputStream;
        private String fieldDelimiter;
        private String rowDelimiter;
        public CanalBinlogRecordWriter(DataOutputStream outputStream, String fieldDelimiter, String rowDelimiter) {
            this.outputStream = outputStream;
            this.fieldDelimiter = fieldDelimiter;
            this.rowDelimiter = rowDelimiter;
        }
        @Override
        public void write(IEtlKey key, CamusWrapper value) throws IOException, InterruptedException {
            if (value == null) return;
            String recordStr = (String) value.getRecord();
            JSONObject record = JSON.parseObject(recordStr, Feature.OrderedField);
            if (record.getString("isDdl").equals("true")) return;
            JSONArray data = record.getJSONArray("data");
            for (int i = 0; i < data.size(); i++) {
                JSONObject obj = data.getJSONObject(i);
                if (obj != null) {
                    StringBuilder fieldsBuilder = new StringBuilder();
                    fieldsBuilder.append(record.getLong("id"));
                    fieldsBuilder.append(fieldDelimiter);
                    fieldsBuilder.append(record.getLong("es"));
                    fieldsBuilder.append(fieldDelimiter);
                    fieldsBuilder.append(record.getLong("ts"));
                    fieldsBuilder.append(fieldDelimiter);
                    fieldsBuilder.append(record.getString("type"));
                    for (Entry<String, Object> entry : obj.entrySet()) {
                        fieldsBuilder.append(fieldDelimiter);
                        fieldsBuilder.append(entry.getValue());
                    }
                    fieldsBuilder.append(rowDelimiter);
                    outputStream.write(fieldsBuilder.toString().getBytes());
                }
            }
        }
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            outputStream.close();
        }
    }
    @Override
    public String getFilenameExtension() { return ""; }
    @Override
    public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context, String fileName, CamusWrapper data, FileOutputCommitter committer) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        String rowDelimiter = conf.get("etl.output.record.delimiter", "
");
        Path path = new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream outputStream = fs.create(path, false);
        return new CanalBinlogRecordWriter(outputStream, "\t", rowDelimiter);
    }
}

Job Execution and Scheduling

The Camus job can be launched with the provided script:

bin/camus-run -P conf/binlog-fetch-camus.properties

We schedule the job every 30 minutes via Crontab or Azkaban. The generated HDFS directory structure follows an hourly partition format (e.g., pt_hour=2023111509), and the /camus/exec folder stores Kafka offsets for exactly‑once processing.

Overall, this CDC pipeline eliminates the latency of daily batch loads, reduces load on the MySQL source, and ensures that Hive reflects real‑time changes while keeping the implementation simple and maintainable.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Data WarehouseHiveCanalCDCCamus
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.