Real-time MySQL Incremental Data Processing with Canal, Spark Streaming, and Kafka
This article explains how to use Alibaba's Canal to capture MySQL binlog changes, forward them to Kafka, and process the incremental data in real time with Spark Streaming, including installation, configuration, client development, Spark code, testing, and troubleshooting dependency conflicts.
Spark Streaming can be used for real‑time projects, and when the data source is incremental changes in MySQL, using Canal avoids heavy JDBC polling that would impact other business workloads.
Canal is an open‑source Java component that simulates a MySQL slave, receives binlog events, and parses them. The tutorial walks through installing Canal 1.1.2 on CentOS 7, setting up MySQL binlog, creating a Canal user, and configuring Zookeeper and Kafka.
Key configuration files are edited as follows:
[mysqld]
# add this line
log-bin=mysql-bin
# row mode
binlog-format=ROW
# unique server_id
server_id=1 mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
mysql> FLUSH PRIVILEGES; kafka-topics.sh --create --zookeeper cdh3:2181,cdh4:2181,cdh5:2181 --partitions 2 --replication-factor 1 --topic example ## instance.properties
canal.instance.mysql.slaveId=3
canal.instance.master.address=node1:3306
canal.instance.tsdb.enable=false
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.defaultDatabaseName=canal_test
canal.instance.enableDruid=false
canal.instance.filter.regex=canal_test\\..*
canal.mq.topic=example
canal.mq.partition=1 # canal.properties
canal.port = 11111
canal.serverMode = kafka
canal.mq.servers = node1:9092,node2:9092,node3:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = allAfter starting Zookeeper, Kafka, and Canal, the binlog changes are pushed to the Kafka topic example. The tutorial then shows how to develop a Canal client in Java (or use the built‑in Kafka mode) and provides the Maven dependency:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>The Spark application is written in Scala. It reads JSON messages from Kafka, filters INSERT/UPDATE events where policy_status equals 1, computes the change in mor_rate, assigns a risk rank (G1, R1, R2), and writes the results into a MySQL table real_risk. Important helper classes include PropertiesUtil, JDBCWrapper, and ParamsList. The core Spark code is:
package yore.spark
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object M_PolicyCreditApp {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setMaster(PropertiesUtil.getPropString("spark.master"))
.setAppName(PropertiesUtil.getPropString("spark.app.name"))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("hadoop.home.dir", "/Users/yoreyuan/soft/hadoop-2.9.2")
val ssc = new StreamingContext(conf, Seconds(PropertiesUtil.getPropInt("spark.streaming.durations.sec").toLong))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint(PropertiesUtil.getPropString("spark.checkout.dir"))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> PropertiesUtil.getPropString("bootstrap.servers"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> PropertiesUtil.getPropString("group.id"),
"auto.offset.reset" -> PropertiesUtil.getPropString("auto.offset.reset"),
"enable.auto.commit" -> (PropertiesUtil.getPropBoolean("enable.auto.commit"): java.lang.Boolean)
)
val topics = Array(PropertiesUtil.getPropString("kafka.topic.name"))
val kafkaStreaming = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// processing pipeline omitted for brevity
ssc.start()
ssc.awaitTermination()
}
def gainRiskRank(rate: Double): String = {
if (rate >= 0.75 && rate < 0.8) "R1"
else if (rate >= 0.80 && rate <= 1) "R2"
else "G1"
}
}Testing involves inserting or updating rows in policy_cred and verifying that the transformed records appear in real_risk. An example Kafka JSON message is shown, illustrating the structure of the Canal‑produced payload.
The article also addresses a common dependency conflict where Spark’s Netty version clashes with the one pulled in by canal.client. The solution is to exclude netty-all from the Canal client dependency:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.client.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>Overall, the guide provides a complete end‑to‑end solution for real‑time incremental data processing from MySQL to Spark Streaming via Canal and Kafka, covering environment setup, code examples, testing, and common pitfalls.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
