How to Sync MySQL Data to Solr in Real-Time Using Canal

This guide explains why real‑time data synchronization between MySQL and services like Redis, Memcached, Solr, or Elasticsearch is essential in modern micro‑service architectures, and demonstrates four synchronization methods—inline code, scheduled tasks, message queues, and Canal—complete with setup, configuration, and Java code examples.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
How to Sync MySQL Data to Solr in Real-Time Using Canal

Data Synchronization Needs in Internet Context

In modern distributed micro‑service environments, NoSQL caches such as Redis/Memcached and full‑text search engines like Solr/Elasticsearch are widely used, creating a need to keep these services synchronized with the source MySQL database in real time.

For illustration, the article uses MySQL‑to‑Solr synchronization as an example.

Data Synchronization Solutions

1. Synchronize in Business Code

After insert, update or delete operations, invoke Solr indexing logic directly in the service layer.

public ResponseResult updateStatus(Long[] ids, String status) {
    try {
        goodsService.updateStatus(ids, status);
        if ("status_success".equals(status)) {
            List<TbItem> itemList = goodsService.getItemList(ids, status);
            itemSearchService.importList(itemList);
            return new ResponseResult(true, "修改状态成功");
        }
    } catch (Exception e) {
        return new ResponseResult(false, "修改状态失败");
    }
}

Advantages: Simple to implement.

Disadvantages: High coupling with business logic and reduced performance.

2. Periodic Task Synchronization

Use a scheduled job (Spring Task, Quartz) to pull changed rows from MySQL and update Solr.

Open‑source mykit‑delay framework: https://github.com/sunshinelyz/mykit-delay

Tip: On the first run, query rows ordered by a timestamp column in descending order and record the maximum timestamp. Subsequent runs only fetch rows with a timestamp greater than the recorded value, avoiding full‑table scans.

Advantages: Decouples business code from Solr updates.

Disadvantages: Data is not truly real‑time.

3. Synchronization via Message Queue

After database changes, send a message to MQ; a consumer reads the message and updates Solr.

public ResponseResult updateStatus(Long[] ids, String status) {
    try {
        goodsService.updateStatus(ids, status);
        if ("status_success".equals(status)) {
            List<TbItem> itemList = goodsService.getItemList(ids, status);
            final String jsonString = JSON.toJSONString(itemList);
            jmsTemplate.send(queueSolr, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(jsonString);
                }
            });
        }
        return new ResponseResult(true, "修改状态成功");
    } catch (Exception e) {
        return new ResponseResult(false, "修改状态失败");
    }
}

Advantages: Business code is decoupled and near‑real‑time.

Disadvantages: Requires MQ integration, adding coupling to messaging APIs.

4. Real‑Time Sync with Canal

Canal parses MySQL binlog events and pushes changes to Solr without touching business code.

Canal Overview

Canal is an open‑source component from Alibaba that subscribes to MySQL binlog, parses incremental data, and provides a stream of change events.

GitHub: https://github.com/alibaba/canal

Canal Working Principle

MySQL Master‑Slave Replication

Master writes changes to binary log.

Slave copies binary log to relay log.

Slave replays relay log to apply changes.

Canal Internal Mechanism

Simulates a MySQL slave, sends dump request to master.

Master pushes binlog to Canal.

Canal parses binary log bytes.

Canal Architecture

Server: a Canal instance (JVM process).

Instance: a data queue; one server may host multiple instances.

EventParser: handles protocol interaction.

EventSink: filters, transforms, and distributes events.

EventStore: stores parsed events.

MetaManager: manages subscription metadata.

Canal Environment Preparation

Enable MySQL Remote Access

grant all privileges on *.* to 'root'@'%' identified by '123456';
flush privileges;

MySQL Configuration

Enable binlog with ROW format (required by Canal).

log-bin=mysql-bin
binlog_format=ROW
server_id=1

Restart MySQL and verify:

SHOW VARIABLES LIKE 'binlog_format';

Create Canal User

CREATE USER canal@'localhost' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
FLUSH PRIVILEGES;

Canal Deployment

Download Canal

Download version 1.1.1 from the official release page.

Extract

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz -C /usr/local/canal/

Directory layout:

bin – executable scripts.

conf – configuration files.

lib – third‑party libraries.

logs – log files.

Configure canal.properties

canal.destinations=example

Edit instance.properties (example)

# canal slaveId, must not duplicate MySQL server_id
canal.instance.mysql.slaveId = 1234

# Master address
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = canaldb
canal.instance.filter.regex = canaldb\\..*

Start Canal

./startup.sh

Testing Canal

Import and Modify Source

Import the example project into IDE and modify the IP address and credentials in SimpleCanalClientTest.

String ip = "192.168.175.100";
CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress(ip, 11111),
    destination,
    "",
    ""
);

Test Data Changes

Create database canaldb and perform insert, update, delete operations. Observe log output showing binlog events.

Data Synchronization Implementation

Requirement

Synchronize MySQL data changes to a Solr index in real time via Canal.

Project Setup

Create a Maven project mykit-canal-demo and add dependencies for Canal client, SolrJ, Jackson, JUnit, etc.

Log4j Configuration

log4j.rootCategory=debug, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m

Entity Class

Define Book with Solr @Field annotations.

public class Book implements Serializable {
    @Field("id") private Integer id;
    @Field("book_name") private String name;
    @Field("book_author") private String author;
    @Field("book_publishtime") private Date publishtime;
    @Field("book_price") private Double price;
    @Field("book_publishgroup") private String publishgroup;
    // getters, setters, toString omitted for brevity
}

Utility Classes

Implement BinlogValue, CanalDataParser, DateUtils, InnerBinlogEntry, and JacksonUtil to parse binlog entries and convert them to Java objects.

Synchronization Program

In SyncDataBootStart, connect to Canal, pull messages, convert them to InnerBinlogEntry, and update Solr accordingly.

public static void main(String[] args) throws Exception {
    CanalConnector canalConnector = CanalConnectors.newSingleConnector(
        new InetSocketAddress("192.168.175.100", 11111),
        "example", "", ""
    );
    canalConnector.connect();
    canalConnector.subscribe();
    while (true) {
        Message message = canalConnector.getWithoutAck(5 * 1024);
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId != -1 && size != 0) {
            List<InnerBinlogEntry> entries = CanalDataParser.convertToInnerBinlogEntry(message);
            syncDataToSolr(entries);
        } else {
            Thread.sleep(1000);
        }
        canalConnector.ack(batchId);
    }
}
private static void syncDataToSolr(List<InnerBinlogEntry> entries) throws Exception {
    SolrServer solrServer = new HttpSolrServer("http://192.168.175.101:8080/solr");
    for (InnerBinlogEntry entry : entries) {
        if (entry.getEventType() == CanalEntry.EventType.INSERT ||
            entry.getEventType() == CanalEntry.EventType.UPDATE) {
            for (Map<String, BinlogValue> row : entry.getRows()) {
                Book book = new Book();
                book.setId(Integer.parseInt(row.get("id").getValue()));
                book.setName(row.get("name").getValue());
                book.setAuthor(row.get("author").getValue());
                book.setPrice(Double.parseDouble(row.get("price").getValue()));
                book.setPublishgroup(row.get("publishgroup").getValue());
                book.setPublishtime(DateUtils.parseDate(row.get("publishtime").getValue()));
                solrServer.addBean(book);
                solrServer.commit();
            }
        } else if (entry.getEventType() == CanalEntry.EventType.DELETE) {
            for (Map<String, BinlogValue> row : entry.getRows()) {
                solrServer.deleteById(row.get("id").getValue());
                solrServer.commit();
            }
        }
    }
}

Running the program continuously listens to Canal, parses MySQL binlog events, and keeps the Solr index in sync with insert, update, and delete operations.

Data flow diagram
Data flow diagram
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavamysqlCanaldata synchronizationSolr
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.