Building a Simple Canal-to-Kafka Demo with Maven Dependencies and Java Code
This guide introduces the canal‑kafka integration package, outlines its constraints, and provides a step‑by‑step tutorial with Maven dependencies and Java source code for a SimpleCanalClient, a Kafka producer, and a server class, enabling a functional demo of Canal to Kafka data streaming.
canal‑kafka is a recently updated Alibaba Cloud package that connects Canal with Kafka for massive message synchronization; this article explains its limitations and demonstrates a flexible custom client implementation.
Build Maven dependencies
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.25</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>SimpleCanalClient
package com.unigroup.client.canal;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.core.canal.CanalToKG;
/**
* @Title: SimpleCanalClient.java
* @Package com.unigroup.canal
* @Description: canal单实例接口
*/
public class SimpleCanalClient {
private CanalConnector connector=null;
public SimpleCanalClient(String ip,String port,String instance) {
// 创建链接
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, Integer.parseInt(port)),instance, "", "");
}
public List<Entry> execute(int batchSize,Class<?> clazz ) throws InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException {
//int batchSize = 1;
int emptyCount = 0;
Object obj = clazz.newInstance();
Method method = clazz.getMethod("send",Message.class);
try {
connector.connect();
// connector.subscribe(".*\\..*");
connector.subscribe("test.test1");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
method.invoke(obj, message);
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
connector.disconnect();
}
return null;
}
}CanalKafkaProducer
package com.unigroup.kafka.producer;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.kafka.producer.KafkaProperties.Topic;
import com.unigroup.utils.MessageSerializer;
/**
* @Title: CanalKafkaProducer.java
* @Package com.unigroup.kafka.producer
* @version V1.0
*/
public class CanalKafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
private Producer<String, Message> producer;
public void init(KafkaProperties kafkaProperties) {
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaProperties.getServers());
properties.put("acks", "all");
properties.put("retries", kafkaProperties.getRetries());
properties.put("batch.size", kafkaProperties.getBatchSize());
properties.put("linger.ms", kafkaProperties.getLingerMs());
properties.put("buffer.memory", kafkaProperties.getBufferMemory());
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", MessageSerializer.class.getName());
producer = new KafkaProducer<String, Message>(properties);
}
public void stop() {
try {
logger.info("## stop the kafka producer");
producer.close();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping kafka producer:", e);
} finally {
logger.info("## kafka producer is down.");
}
}
public void send(Topic topic, Message message) throws IOException {
ProducerRecord<String, Message> record;
if (topic.getPartition() != null) {
record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
} else {
record = new ProducerRecord<String, Message>(topic.getTopic(), message);
}
producer.send(record);
if (logger.isDebugEnabled()) {
logger.debug("send message to kafka topic: {}
{}", topic.getTopic(), message.toString());
}
}
}canalToKafkaServer
package com.unigroup.kafka.server;
import com.unigroup.client.canal.SimpleCanalClient;
import com.unigroup.kafka.producer.CanalKafkaProducer;
import com.unigroup.utils.GetProperties;
/**
* @Title: canal.java
* @Package com.unigroup.kafka.server
* @version V1.0
*/
public class canalToKafkaServer {
public static void execute() {
SimpleCanalClient simpleCanalClient = new SimpleCanalClient(GetProperties.getValue("MYSQL_HOST"),
GetProperties.getValue("MTSQL_PORT"), GetProperties.getValue("INSTANCE"));
try {
simpleCanalClient.execute(1,CanalKafkaProducer.class);
} catch (Exception e) {
e.printStackTrace();
}
}
}Thus a simple Canal‑to‑Kafka demo is completed; the provided code is for testing purposes and can be extended with additional functionality for production environments.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
