Backend Development 21 min read

Design and Implementation of an Elasticsearch Data Synchronization Service (ECP) for Large‑Scale Order Data

This article describes the challenges and technical solutions for synchronizing billions of order records from a relational database to Elasticsearch, including multi‑source data reading, dynamic rate limiting, retry strategies, SPI‑based service integration, environment isolation, health‑checking, smooth migration, and structured logging, all implemented in a backend service called ECP.

Zhuanzhuan Tech
Zhuanzhuan Tech
Zhuanzhuan Tech
Design and Implementation of an Elasticsearch Data Synchronization Service (ECP) for Large‑Scale Order Data

1 Background

The team needed a feature that allows arbitrary combination of multiple conditions to query order data, which amounts to over a hundred million records, making the requirement non‑trivial.

The solution is to synchronize heterogeneous order data to Elasticsearch (ES) and leverage ES's inverted index and caching capabilities for complex multi‑condition queries. Existing ES clusters are available, but some data does not exist in current ES indices, requiring a full re‑index of the order database, which could take a week.

2 Complexity of Order Data Synchronization to ES

2.1 Data Synchronization ES Index Process

The process involves reading all order data from the order database, fetching related user and product information from respective services using user ID and product ID, assembling the enriched data, and finally writing it to the ES cluster.

2.2 Pain Points

Extracting billions of rows without impacting the production database; a read‑only replica and multiple data sources are used, with the data sharded across 16 databases and 16 tables each (256 tables total).

Memory constraints prevent loading all data at once; pagination with a simple LIMIT is inefficient for large offsets, so a unique, incremental cursor field is used.

Fetching data from multiple upstream services must not overload those services; therefore, flow control and isolation are required.

Progress monitoring and estimation of remaining time are needed.

Failure handling: retry strategies, alerting, and the ability to resume from the point of interruption.

Support for pausing the sync, manual ID‑based sync, and scheduling syncs during off‑peak hours.

Extending the solution to synchronize other domain data (e.g., product data) without rewriting the whole logic.

3 The Magical Service – ECP

ECP automates and visualizes the entire data‑heterogenization workflow, reducing the cost of syncing data to ES.

3.1 Simple ECP Execution Flow

ECP reads data from a source and pushes it to an ES write service. The write service is implemented by each downstream consumer, allowing a plug‑in architecture.

3.2 Multi‑Data‑Source Reading

3.2.1 ID Source

A text box for entering IDs, suitable for small‑scale syncs or fixing inconsistencies.

3.2.2 File Source

Data is read from a text file stored in object storage. Large files are streamed line‑by‑line to avoid OOM.

try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
    if (!response.isSuccessful()) {
        throw new IOException("Unexpected code " + response);
    }
    // Stream file data
    InputStream inputStream = response.body().byteStream();
    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
}

3.2.3 Script Source

For massive data volumes, a script combines a SQL statement with a data source configuration. ECP executes the SQL, streams the result set, and pushes it to the ES write service.

3.2.4 Large‑Scale Script Reading Implementation

Instead of offset‑based pagination, a cursor based on an indexed, auto‑increment column (e.g., order_id) is used:

select * from t_order where order_id > xxx order by order_id desc limit 10

ECP prefers JDBC cursor queries with a configurable fetch size to avoid OOM:

// Establish connection
    conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
    // Create forward‑only, read‑only statement
    stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    stmt.setFetchSize(param.getFetchSize());

3.3 SQL Parsing and Validation

ECP uses Druid's SQL parser to convert SQL into an AST for reliable validation and manipulation, avoiding fragile regex‑based approaches.

3.4 Dynamic Rate Limiting

Guava's RateLimiter provides per‑instance QPS throttling. The QPS value is stored in the database and a scheduled task syncs changes to the RateLimiter at runtime.

3.5 Retry Strategy and Fault Awareness

Failures are categorized (network jitter, business exceptions, upstream service overload). A Fibonacci back‑off retry (implemented with Guava Retry) is applied, with alerting after a configurable number of attempts.

// Retry component configuration
private final Retryer
RETRYER = RetryerBuilder.
newBuilder()
        .retryIfException(input -> !isPauseException(input))
        .withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
        .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
        .withRetryListener(new RetryListener() {
            @Override
            public
void onRetry(Attempt
attempt) {
                if (attempt.hasException()) {
                    log.error("act=【DataFlushRpcCallRetry】desc=【重试】重试次数=【{}】重试异常=【{}】", attempt.getAttemptNumber(), attempt.getExceptionCause());
                    alarmIfExceedThreshold(attempt);
                }
            }
        })
        .build();

public void execute(Runnable runnable) {
    innerExecute(runnable, RETRYER);
}

3.6 SPI Mechanism for Data Push

ECP defines a generic interface for ES indexing; each consumer implements the interface. New consumers are added by providing an implementation without modifying ECP. Reference reference = new Reference<>(); reference.setServiceName(serviceName); reference.setInterfaceClass(IEsIndexFlushAPI.class); reference.setApplicationConfig(applicationConfig); IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer()); EMApiResult result = iEsIndexFlushAPI.flush(request);

3.7 Environment Isolation

ECP integrates tag‑based routing so that synchronization tasks can be directed to isolated service groups (e.g., a "FLUSH" tag) without affecting the production environment.

3.8 Health‑Check and Fault Recovery

A keep‑alive component periodically renews task leases; a recovery component scans stale tasks and restarts them after acquiring a lock.

@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
public void renewal(){
    futureMap.forEach((taskId,future)->{
        if (!future.isDone()){
            log.info("act=【renewal】desc=【任务续期】taskId=【{}】续期时间=【{}】",taskId, DateUtils.dateToString(new Date(),DateUtils.PATTERN));
            contextService.renewal(taskId);
        } else {
            log.info("act=【renewal】desc=【任务结束】taskId=【{}】",taskId);
            futureMap.remove(taskId);
        }
    });
}
@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
public void restartTask(){
    List
contextPOS = contextService.queryRunningTask();
    for (TaskFlushExecuteContextPO contextPO : contextPOS) {
        Integer durationMin = calculateTimeSinceLastAlive();
        if (durationMin >= MAX_DURATION_MIN) {
            log.info("act=【restartTask】desc=【任务重新拉起】taskId=【{}】",contextPO.getTaskId());
            int i = contextExtMapper.casUpdateAliveTime();
            if (i > 0) {
                restart0(contextPO, aliveTime);
            }
        }
    }
}

3.9 Smooth Migration

ECP supports creating a new index, writing to both old and new indices, and switching an alias to achieve zero‑downtime migration.

3.10 Elegant Logging

Logging is decoupled from business logic using annotation‑driven flow recording. Dynamic values (e.g., task ID) are injected via Spring EL expressions.

@Flow(subjectIdEp = "#taskPO.id", subjectType = SubjectTypeEnum.TASK, operateFlowType = OperateFlowTypeEnum.CREATE_TASK, content = "'创建任务,任务ID:' + #taskPO.id")
public void saveTaskWithUser(TaskPO taskPO) {
    String name = LoginUserContext.get().getName();
    taskPO.setCreator(name);
    taskPO.setModifier(name);
    taskMapper.insertSelective(taskPO);
}

4 Summary

ECP demonstrates a comprehensive backend solution for large‑scale data synchronization to Elasticsearch, covering data source abstraction, cursor‑based pagination, dynamic rate limiting, retry mechanisms, SPI‑based extensibility, environment isolation, health‑checking, smooth migration, and structured logging.

JavaElasticsearchdata synchronizationRate LimitingSPIretry strategyBackend Servicelarge-scale data
Zhuanzhuan Tech
Written by

Zhuanzhuan Tech

A platform for Zhuanzhuan R&D and industry peers to learn and exchange technology, regularly sharing frontline experience and cutting‑edge topics. We welcome practical discussions and sharing; contact waterystone with any questions.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.