Backend Development 14 min read

Migrating MySQL Data to Elasticsearch: Full Sync, Incremental Sync, Consistency Issues, and Using Canal

This article explains how to migrate MySQL table data to Elasticsearch using a one‑time full sync, a scheduled incremental sync, addresses strong consistency challenges, and introduces Alibaba's Canal framework for reliable binlog‑based data replication.

Java Captain
Java Captain
Java Captain
Migrating MySQL Data to Elasticsearch: Full Sync, Incremental Sync, Consistency Issues, and Using Canal

In daily project development, Elasticsearch (ES) is often used for keyword search, while persistent business data remains in MySQL; therefore, the key question is how to populate ES with data from MySQL.

1. One‑time full sync reads all rows from a MySQL table into memory, converts them to ES entities, and writes them to ES in batches. This approach can exhaust memory and CPU for large tables, as illustrated by a test with 100,000 rows consuming 90% RAM and 100% CPU.

@Component05
@Slf4j
public class FullSyncArticleToES implements CommandLineRunner {
    @Resource
    private ArticleMapper articleMapper;
    @Resource
    private ArticleRepository articleRepository;
    /**
     * Execute once for full migration
     */
    public void fullSyncArticleToES() {
        LambdaQueryWrapper
wrapper = new LambdaQueryWrapper<>();
        List
articleList = articleMapper.selectList(wrapper);
        if (CollectionUtils.isNotEmpty(articleList)) {
            List
esArticleList = articleList.stream().map(ESArticle::dbToEs).collect(Collectors.toList());
            final int pageSize = 500;
            final int total = esArticleList.size();
            log.info("------------FullSyncArticleToES start!-----------, total {}", total);
            for (int i = 0; i < total; i += pageSize) {
                int end = Math.min(i + pageSize, total);
                log.info("------sync from {} to {}------", i, end);
                articleRepository.saveAll(esArticleList.subList(i, end));
            }
            log.info("------------FullSyncPostToEs end!------------, total {}", total);
        } else {
            log.info("------------DB no Data!------------");
        }
    }
    @Override
    public void run(String... args) {}
}

2. Incremental sync via scheduled task reads recent rows (e.g., the last five minutes) at a fixed rate, processes them in similar batches, and writes to ES. While more resource‑friendly than full sync, frequent DB access can still stress the server.

@Component
@Slf4j
public class IncSyncArticleToES {
    @Resource
    private ArticleMapper articleMapper;
    @Resource
    private ArticleRepository articleRepository;
    /**
     * Execute every minute
     */
    @Scheduled(fixedRate = 60 * 1000)
    public void run() {
        Date fiveMinutesAgoDate = new Date(new Date().getTime() - 5 * 60 * 1000L);
        List
articleList = articleMapper.listArticleWithData(fiveMinutesAgoDate);
        if (CollectionUtils.isNotEmpty(articleList)) {
            List
esArticleList = articleList.stream().map(ESArticle::dbToEs).collect(Collectors.toList());
            final int pageSize = 500;
            int total = esArticleList.size();
            log.info("------------IncSyncArticleToES start!-----------, total {}", total);
            for (int i = 0; i < total; i += pageSize) {
                int end = Math.min(i + pageSize, total);
                log.info("sync from {} to {}", i, end);
                articleRepository.saveAll(esArticleList.subList(i, end));
            }
            log.info("------------IncSyncArticleToES end!------------, total {}", total);
        } else {
            log.info("------------DB no Data!------------");
        }
    }
}

3. Strong consistency problem When MySQL and ES are accessed concurrently, high‑throughput inserts may cause ES to lag behind, leading to inconsistent query results. A proposed solution is to use ES only for search, then verify each returned ES document ID against MySQL, returning the MySQL source data and optionally deleting stale ES records.

@Override
public PageInfo
testSearchFromES(ArticleSearchDTO articleSearchDTO) {
    SearchHits
searchHits = elasticTemplate.search(searchQuery, ESArticle.class);
    List
resultList = new ArrayList<>();
    if (searchHits.hasSearchHits()) {
        List
articleIdList = searchHits.getSearchHits().stream()
            .map(val -> val.getContent().getId())
            .collect(Collectors.toList());
        List
articleList = baseMapper.selectBatchIds(articleIdList);
        if (CollectionUtils.isNotEmpty(articleList)) {
            Map
> idArticleMap = articleList.stream()
                .collect(Collectors.groupingBy(Article::getId));
            articleIdList.forEach(articleId -> {
                if (idArticleMap.containsKey(articleId)) {
                    resultList.add(idArticleMap.get(articleId).get(NumberUtils.INTEGER_ZERO));
                } else {
                    String delete = elasticTemplate.delete(String.valueOf(articleId), PostEsDTO.class);
                    log.info("delete post {}", delete);
                }
            });
        }
    }
    PageInfo
pageInfo = new PageInfo<>();
    pageInfo.setList(resultList);
    pageInfo.setTotal(searchHits.getTotalHits());
    System.out.println(pageInfo);
    return pageInfo;
}

Although the above code demonstrates possible solutions, the article’s main focus is introducing Canal , an Alibaba open‑source tool that captures MySQL binlog events for incremental data synchronization.

4. Canal framework

Canal simulates a MySQL slave, connects to the master via the dump protocol, and streams binlog events. It supports high availability through a single running server instance and a single client handling get/ack/rollback operations to preserve order.

4.1 Basic principle – Canal parses binlog and forwards changes to downstream systems such as Elasticsearch.

4.2 Installation & usage (key points) – Recommended environment: CentOS 7, JDK 11+, MySQL 5.7.x, Elasticsearch 7.16.x, canal‑server 1.1.5, canal‑adapter 1.1.5. Docker installation is suggested.

4.3 Dependency (test)

com.alibaba.otter
canal.client
1.1.4

4.4 Code example (test) – A simple client connects to a running canal‑server, continuously fetches binlog entries, prints the database, table, and event type, and displays column changes.

public class CanalClientUtils {
    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("your_public_ip", 11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 1000;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    Thread.sleep(1000);
                } else {
                    emptyCount = 0;
                    System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId);
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List
entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of error-event has an error , data:" + entry, e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.printf("-----------binlog[%s:%s] , name[%s,%s] , eventType:%s%n ------------",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType);
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("---------before data----------");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("---------after data-----------");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List
columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + ",update status:" + column.getUpdated());
        }
    }
}

After deploying Canal, the MySQL binlog events are captured and synchronized to ES automatically, making the process transparent to developers.

Commercial alternatives such as Alibaba Cloud DTS also provide managed MySQL‑to‑ES migration services.

5. Conclusion – The article presented the full thinking process for migrating MySQL data to Elasticsearch, discussed full and incremental sync strategies, consistency handling, and finally recommended Canal as a robust solution for real‑time binlog‑driven synchronization.

backenddata migrationElasticsearchMySQLCanalsync
Java Captain
Written by

Java Captain

Focused on Java technologies: SSM, the Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading; occasionally covers DevOps tools like Jenkins, Nexus, Docker, ELK; shares practical tech insights and is dedicated to full‑stack Java development.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.