How to Sync MySQL Data to Solr in Real-Time Using Canal
This guide explains why real‑time data synchronization between MySQL and services like Redis, Memcached, Solr, or Elasticsearch is essential in modern micro‑service architectures, and demonstrates four synchronization methods—inline code, scheduled tasks, message queues, and Canal—complete with setup, configuration, and Java code examples.
Data Synchronization Needs in Internet Context
In modern distributed micro‑service environments, NoSQL caches such as Redis/Memcached and full‑text search engines like Solr/Elasticsearch are widely used, creating a need to keep these services synchronized with the source MySQL database in real time.
For illustration, the article uses MySQL‑to‑Solr synchronization as an example.
Data Synchronization Solutions
1. Synchronize in Business Code
After insert, update or delete operations, invoke Solr indexing logic directly in the service layer.
public ResponseResult updateStatus(Long[] ids, String status) {
try {
goodsService.updateStatus(ids, status);
if ("status_success".equals(status)) {
List<TbItem> itemList = goodsService.getItemList(ids, status);
itemSearchService.importList(itemList);
return new ResponseResult(true, "修改状态成功");
}
} catch (Exception e) {
return new ResponseResult(false, "修改状态失败");
}
}Advantages: Simple to implement.
Disadvantages: High coupling with business logic and reduced performance.
2. Periodic Task Synchronization
Use a scheduled job (Spring Task, Quartz) to pull changed rows from MySQL and update Solr.
Open‑source mykit‑delay framework: https://github.com/sunshinelyz/mykit-delay
Tip: On the first run, query rows ordered by a timestamp column in descending order and record the maximum timestamp. Subsequent runs only fetch rows with a timestamp greater than the recorded value, avoiding full‑table scans.
Advantages: Decouples business code from Solr updates.
Disadvantages: Data is not truly real‑time.
3. Synchronization via Message Queue
After database changes, send a message to MQ; a consumer reads the message and updates Solr.
public ResponseResult updateStatus(Long[] ids, String status) {
try {
goodsService.updateStatus(ids, status);
if ("status_success".equals(status)) {
List<TbItem> itemList = goodsService.getItemList(ids, status);
final String jsonString = JSON.toJSONString(itemList);
jmsTemplate.send(queueSolr, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(jsonString);
}
});
}
return new ResponseResult(true, "修改状态成功");
} catch (Exception e) {
return new ResponseResult(false, "修改状态失败");
}
}Advantages: Business code is decoupled and near‑real‑time.
Disadvantages: Requires MQ integration, adding coupling to messaging APIs.
4. Real‑Time Sync with Canal
Canal parses MySQL binlog events and pushes changes to Solr without touching business code.
Canal Overview
Canal is an open‑source component from Alibaba that subscribes to MySQL binlog, parses incremental data, and provides a stream of change events.
GitHub: https://github.com/alibaba/canal
Canal Working Principle
MySQL Master‑Slave Replication
Master writes changes to binary log.
Slave copies binary log to relay log.
Slave replays relay log to apply changes.
Canal Internal Mechanism
Simulates a MySQL slave, sends dump request to master.
Master pushes binlog to Canal.
Canal parses binary log bytes.
Canal Architecture
Server: a Canal instance (JVM process).
Instance: a data queue; one server may host multiple instances.
EventParser: handles protocol interaction.
EventSink: filters, transforms, and distributes events.
EventStore: stores parsed events.
MetaManager: manages subscription metadata.
Canal Environment Preparation
Enable MySQL Remote Access
grant all privileges on *.* to 'root'@'%' identified by '123456';
flush privileges;MySQL Configuration
Enable binlog with ROW format (required by Canal).
log-bin=mysql-bin
binlog_format=ROW
server_id=1Restart MySQL and verify:
SHOW VARIABLES LIKE 'binlog_format';Create Canal User
CREATE USER canal@'localhost' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
FLUSH PRIVILEGES;Canal Deployment
Download Canal
Download version 1.1.1 from the official release page.
Extract
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz -C /usr/local/canal/Directory layout:
bin – executable scripts.
conf – configuration files.
lib – third‑party libraries.
logs – log files.
Configure canal.properties
canal.destinations=exampleEdit instance.properties (example)
# canal slaveId, must not duplicate MySQL server_id
canal.instance.mysql.slaveId = 1234
# Master address
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = canaldb
canal.instance.filter.regex = canaldb\\..*Start Canal
./startup.shTesting Canal
Import and Modify Source
Import the example project into IDE and modify the IP address and credentials in SimpleCanalClientTest.
String ip = "192.168.175.100";
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(ip, 11111),
destination,
"",
""
);Test Data Changes
Create database canaldb and perform insert, update, delete operations. Observe log output showing binlog events.
Data Synchronization Implementation
Requirement
Synchronize MySQL data changes to a Solr index in real time via Canal.
Project Setup
Create a Maven project mykit-canal-demo and add dependencies for Canal client, SolrJ, Jackson, JUnit, etc.
Log4j Configuration
log4j.rootCategory=debug, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m
Entity Class
Define Book with Solr @Field annotations.
public class Book implements Serializable {
@Field("id") private Integer id;
@Field("book_name") private String name;
@Field("book_author") private String author;
@Field("book_publishtime") private Date publishtime;
@Field("book_price") private Double price;
@Field("book_publishgroup") private String publishgroup;
// getters, setters, toString omitted for brevity
}Utility Classes
Implement BinlogValue, CanalDataParser, DateUtils, InnerBinlogEntry, and JacksonUtil to parse binlog entries and convert them to Java objects.
Synchronization Program
In SyncDataBootStart, connect to Canal, pull messages, convert them to InnerBinlogEntry, and update Solr accordingly.
public static void main(String[] args) throws Exception {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.175.100", 11111),
"example", "", ""
);
canalConnector.connect();
canalConnector.subscribe();
while (true) {
Message message = canalConnector.getWithoutAck(5 * 1024);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId != -1 && size != 0) {
List<InnerBinlogEntry> entries = CanalDataParser.convertToInnerBinlogEntry(message);
syncDataToSolr(entries);
} else {
Thread.sleep(1000);
}
canalConnector.ack(batchId);
}
}
private static void syncDataToSolr(List<InnerBinlogEntry> entries) throws Exception {
SolrServer solrServer = new HttpSolrServer("http://192.168.175.101:8080/solr");
for (InnerBinlogEntry entry : entries) {
if (entry.getEventType() == CanalEntry.EventType.INSERT ||
entry.getEventType() == CanalEntry.EventType.UPDATE) {
for (Map<String, BinlogValue> row : entry.getRows()) {
Book book = new Book();
book.setId(Integer.parseInt(row.get("id").getValue()));
book.setName(row.get("name").getValue());
book.setAuthor(row.get("author").getValue());
book.setPrice(Double.parseDouble(row.get("price").getValue()));
book.setPublishgroup(row.get("publishgroup").getValue());
book.setPublishtime(DateUtils.parseDate(row.get("publishtime").getValue()));
solrServer.addBean(book);
solrServer.commit();
}
} else if (entry.getEventType() == CanalEntry.EventType.DELETE) {
for (Map<String, BinlogValue> row : entry.getRows()) {
solrServer.deleteById(row.get("id").getValue());
solrServer.commit();
}
}
}
}Running the program continuously listens to Canal, parses MySQL binlog events, and keeps the Solr index in sync with insert, update, and delete operations.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Su San Talks Tech
Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
