Databases 19 min read

Data Synchronization Architecture and Refactoring for Large-Scale Travel Data at Qunar

This article describes the challenges of handling billions of travel records in Qunar's MySQL databases, compares open‑source data sync solutions like Databus and Canal, outlines the legacy system’s issues, and presents a refactored architecture that introduces Otter, ES gateway, and improved aggregation to achieve low‑latency, reliable, and scalable data synchronization.

Qunar Tech Salon
Qunar Tech Salon
Qunar Tech Salon
Data Synchronization Architecture and Refactoring for Large-Scale Travel Data at Qunar

Background

In the era of the internet, Qunar has accumulated massive travel data (flight tickets, hotels, tickets, etc.) stored in MySQL. A single table with more than 1 billion rows leads to slow queries and potential service outages. The common solution is to synchronize data to a secondary database or heterogeneous data store to improve query efficiency.

Open‑Source Data Synchronization Solutions

1. Databus

Databus is a low‑latency, reliable, transaction‑consistent data sync system that extracts changes from database logs and provides them to customized clients.

Source‑independent: supports Oracle and MySQL.

Scalable and highly available: supports thousands of consumers.

Preserves transaction order.

Millisecond‑level latency with flexible subscription mechanisms.

Unlimited back‑tracking for full data copy.

2. Canal

Canal, an Alibaba open‑source project written in Java, simulates a MySQL slave to capture binlog changes, supporting row‑mode binlog to obtain before/after data for high‑performance change capture.

Supports only MySQL.

Extensible: can send data to Kafka or MQ.

Low latency (ms level).

Transactional support via GTID.

Back‑tracking based on binlog retention.

A comparison table shows that Canal supports only MySQL, while Databus supports MySQL and Oracle; both have low latency and transactional support, but Databus is more complex.

Legacy System Issues

The old system suffered from several problems:

Databus producer was a single point of failure.

Canal version was outdated, making operations difficult.

Configuration stored in Zookeeper made the system fragile when ZK was unavailable.

Full‑sync required hard‑coded implementations.

Query service (Qgalaxy) tightly coupled business logic and direct ES access.

Mixed deployment of core and non‑core ES indices caused performance degradation.

Refactored Architecture

The new architecture introduces Otter to replace Canal, a unified ES gateway, and a DataX framework for bulk imports. Key components:

Otter replaces Canal, providing high availability and centralized configuration.

ES gateway abstracts ES version differences and provides a unified DSL interface.

DataX handles full data imports.

Core and non‑core ES clusters are separated for stability.

Otter Modifications

Otter’s directory is split into Node (data sync), Manager (node management), and Share (common utilities). The following code shows the addition of a partitionKey field to DataMediaPair for Kafka partitioning:

public class DataMediaPair {
    private Long id;
    private Long pipelineId; // sync task id
    private DataMedia source; // source
    private DataMedia target; // target
    private Long pullWeight; // weight for pulling from source
    private Long pushWeight; // weight for pushing to target
    private ExtensionData resolverData; // resolver
    private ExtensionData filterData; // filter
    private ColumnPairMode columnPairMode = ColumnPairMode.INCLUDE;
    private List<ColumnPair> columnPairs = new ArrayList<>();
    private List<ColumnGroup> columnGroups = new ArrayList<>();
    private Date gmtCreate;
    private Date gmtModified;
    private String subEventType; // event type
    private Long kafkaDataMediaId; // extra sync target
    private boolean primaryTable; // is primary table
    private String groovy; // groovy script
    private String esIndex; // ES index
    private String partitionKey; // partition key for Kafka
}

The DbLoadAction class processes data in the Load phase, now sending messages to Kafka instead of writing to another DB:

public class DbLoadAction implements InitializingBean, DisposableBean {
    /**
     * Returns successfully processed records
     */
    public DbLoadContext load(RowBatch rowBatch, WeightController controller) {
        WeightBuckets<EventData> buckets = buildWeightBuckets(context, datas);
        List<Long> weights = buckets.weights();
        controller.start(weights);
        if (CollectionUtils.isEmpty(datas)) {
            logger.info("##no eventdata for load");
        }
        for (int i = 0; i < weights.size(); i++) {
            Long weight = weights.get(i);
            controller.await(weight.intValue());
            List<EventData> items = buckets.getItems(weight);
            logger.debug("##start load for weight:" + weight);
            items = DbLoadMerger.merge(items);
            DbLoadData loadData = new DbLoadData();
            doBefore(items, context, loadData);
            doLoad(context, loadData);
            controller.single(weight.intValue());
            logger.debug("##end load for weight:" + weight);
        }
        return null;
    }
}

It is crucial to understand that RowBatch represents a collection of binlog events for a specific database instance, not a single table, which affects ordering in sharded environments.

Table Relationship Abstraction

To model parent‑child table relationships, an interface TableDataRow and its implementations RootTableDataRow and ChildTableDataRow are defined:

public interface TableDataRow {
    String getTableName();
    RowType getType();
    String getSql();
    List<ChildTableDataRow> getChildren();
    RowKey getRowKey();
}

Root table implementation:

public class RootTableDataRow implements TableDataRow {
    private String tableName;
    private RowType type;
    private String sql;
    private List<ChildTableDataRow> children;
    private RowKey rowKey;
    private Set<String> dbNames;
}

Child table implementation includes a reference to its parent and its own row key:

public class ChildTableDataRow implements TableDataRow {
    private String tableName;
    private RowType type;
    private String sql;
    private WhereEntry whereEntry;
    private List<ChildTableDataRow> children;
    private TableDataRow parent;
    private RowKey rowKey;
}

The core algorithm for generating a consistent root row key across all child tables is implemented in getRootRowKeyValue:

public String getRootRowKeyValue(TableDataRow tableDataRow, String thisTableRowKeyValue, String schemaName, CollectConfig collectConfig) {
    if (!tableDataRow.getRowKey().isDepend()) {
        return thisTableRowKeyValue;
    }
    if (tableDataRow instanceof RootTableDataRow) {
        return thisTableRowKeyValue;
    }
    ChildTableDataRow child = (ChildTableDataRow) tableDataRow;
    String parentRowKeyValue = getParentRowKeyValue(schemaName, thisTableRowKeyValue, child, collectConfig);
    TableDataRow parent = child.getParent();
    return getRootRowKeyValue(parent, parentRowKeyValue, schemaName, collectConfig);
}

General ES Gateway Architecture

The ES gateway encapsulates read/write APIs, providing authentication, traffic control, and rate limiting. It consists of:

Ticket application and authentication module.

Configuration module for cluster metadata, index mappings, and permissions.

Write module supporting synchronous Dubbo writes and asynchronous Kafka writes.

Read module offering count, normal, and deep‑pagination (scroll) queries.

Traffic switching module for primary/secondary cluster upgrades.

Rate‑limiting module using Hystrix to limit QPS and request size.

Conclusion

The overall data synchronization system is divided into three major parts: Otter for binlog‑to‑Kafka sync, DTS for business data aggregation across parent‑child tables, and the ES gateway for unified query and write interfaces. By introducing Otter, improving configuration management, separating core ES clusters, and adding a generic ES gateway, the system achieves low latency, high availability, and scalable data synchronization.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaElasticsearchKafkadata synchronizationETLdatabases
Qunar Tech Salon
Written by

Qunar Tech Salon

Qunar Tech Salon is a learning and exchange platform for Qunar engineers and industry peers. We share cutting-edge technology trends and topics, providing a free platform for mid-to-senior technical professionals to exchange and learn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.