Big Data 8 min read

Building a Simple Canal-to-Kafka Demo with Maven Dependencies and Java Code

This guide introduces the canal‑kafka integration package, outlines its constraints, and provides a step‑by‑step tutorial with Maven dependencies and Java source code for a SimpleCanalClient, a Kafka producer, and a server class, enabling a functional demo of Canal to Kafka data streaming.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Building a Simple Canal-to-Kafka Demo with Maven Dependencies and Java Code

canal‑kafka is a recently updated Alibaba Cloud package that connects Canal with Kafka for massive message synchronization; this article explains its limitations and demonstrates a flexible custom client implementation.

Build Maven dependencies

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.25</version>
</dependency> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

SimpleCanalClient

package com.unigroup.client.canal;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.core.canal.CanalToKG;

/**   
 * @Title: SimpleCanalClient.java 
 * @Package com.unigroup.canal 
 * @Description: canal单实例接口
 */
public class SimpleCanalClient {

    private CanalConnector connector=null;

    public SimpleCanalClient(String ip,String port,String instance) {
        // 创建链接
        connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, Integer.parseInt(port)),instance, "", "");
    }
    public List<Entry> execute(int batchSize,Class<?> clazz ) throws InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException {

        //int batchSize = 1;
        int emptyCount = 0;
        Object obj = clazz.newInstance();
        Method method = clazz.getMethod("send",Message.class);
        try {
            connector.connect();
            // connector.subscribe(".*\\..*");
            connector.subscribe("test.test1");

            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    method.invoke(obj, message);            
                }
                connector.ack(batchId); // 提交确认

                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } catch (IllegalAccessException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
        return null;
    }
}

CanalKafkaProducer

package com.unigroup.kafka.producer;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.kafka.producer.KafkaProperties.Topic;
import com.unigroup.utils.MessageSerializer;

/**   
 * @Title: CanalKafkaProducer.java 
 * @Package  com.unigroup.kafka.producer 
 * @version V1.0   
 */
public class CanalKafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);

    private Producer<String, Message> producer;

    public void init(KafkaProperties kafkaProperties) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", "all");
        properties.put("retries", kafkaProperties.getRetries());
        properties.put("batch.size", kafkaProperties.getBatchSize());
        properties.put("linger.ms", kafkaProperties.getLingerMs());
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", MessageSerializer.class.getName());
        producer = new KafkaProducer<String, Message>(properties);
    }

    public void stop() {
        try {
            logger.info("## stop the kafka producer");
            producer.close();
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping kafka producer:", e);
        } finally {
            logger.info("## kafka producer is down.");
        }
    }

    public void send(Topic topic, Message message) throws IOException {

        ProducerRecord<String, Message> record;
        if (topic.getPartition() != null) {
            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
        } else {
            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
        }
        producer.send(record);
        if (logger.isDebugEnabled()) {
            logger.debug("send message to kafka topic: {} 
 {}", topic.getTopic(), message.toString());
        }
    }
}

canalToKafkaServer

package com.unigroup.kafka.server;

import com.unigroup.client.canal.SimpleCanalClient;
import com.unigroup.kafka.producer.CanalKafkaProducer;
import com.unigroup.utils.GetProperties;

/**   
 * @Title: canal.java 
 * @Package com.unigroup.kafka.server 
 * @version V1.0   
 */
public class canalToKafkaServer {
    public static void execute() {
        SimpleCanalClient simpleCanalClient = new SimpleCanalClient(GetProperties.getValue("MYSQL_HOST"),
                GetProperties.getValue("MTSQL_PORT"), GetProperties.getValue("INSTANCE"));
        try {
            simpleCanalClient.execute(1,CanalKafkaProducer.class);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

Thus a simple Canal‑to‑Kafka demo is completed; the provided code is for testing purposes and can be extended with additional functionality for production environments.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataKafkamavenCanalData Integration
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.