Backend Development 22 min read

Design and Implementation of an Elasticsearch Data Synchronization Service (ECP)

This article describes the challenges of synchronizing billions of order records to Elasticsearch and presents the design, architecture, and key technical details of a generic data‑sync service (ECP) that supports multiple data sources, dynamic rate limiting, retry strategies, SPI‑based extensibility, environment isolation, health‑check, fault recovery, smooth migration, and elegant logging.

Architect
Architect
Architect
Design and Implementation of an Elasticsearch Data Synchronization Service (ECP)

1 Background

Faced with a requirement to provide a flexible multi‑condition order query over more than a hundred million records, the team realized that the solution is to synchronize heterogeneous order data into Elasticsearch (ES) and leverage its inverted index and caching capabilities for complex queries.

However, some data does not exist in the current ES index, so the entire order dataset must be re‑indexed, which would take about a week if done naïvely.

2 Complexity of Order Data Synchronization to ES

2.1 Data Sync Process

The process involves reading all orders from the order database, fetching related user and product information from upstream services, assembling the enriched records, and writing them to the ES cluster.

2.2 Pain Points

Extracting billions of rows without impacting the primary database; a read‑only replica and multiple data sources are required.

Paging large datasets efficiently; using a unique, incremental cursor field instead of simple LIMIT‑OFFSET.

Calling multiple upstream services without overloading them; rate‑limiting and isolation are necessary.

Monitoring sync progress and providing real‑time status to product owners.

Handling failures with retry, alerting, and the ability to resume from the last successful point.

Supporting manual pause/resume and partial sync of inconsistent orders.

Allowing ad‑hoc sync of specific order IDs.

Collecting execution statistics such as start/end time, duration, and operator.

Scheduling syncs during off‑peak hours.

Extending the solution to synchronize other domains such as product data.

3 The "Magical" Service – ECP

ECP (ElasticSearch Control Platform) automates and visualizes the entire sync workflow, reducing the cost of data heterogeneity.

3.1 Simple Run Flow

ECP reads data from a source and pushes it to an ES write service. The write service is implemented by each consumer according to its business logic.

3.2 Multi‑Data‑Source Reading

ECP currently supports three source types:

ID Source : a text box for entering a single ID, suitable for small‑scale syncs.

File Source : uploads a text file to object storage; ECP streams the file line‑by‑line to avoid OOM.

Script Source : executes a user‑provided SQL against a configured database, enabling large‑scale reads.

File streaming example:

try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
    if (!response.isSuccessful()) {
        throw new IOException("Unexpected code " + response);
    }
    // Stream file data
    InputStream inputStream = response.body().byteStream();
    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
}

For massive data, ECP uses JDBC cursor queries instead of LIMIT‑OFFSET:

// Establish connection
    conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
    // Create forward‑only, read‑only statement
    stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    stmt.setFetchSize(param.getFetchSize());

Cursor fetching avoids OOM by reading a configurable batch size.

3.3 SQL Parsing and Validation

ECP uses Druid to parse SQL into an AST, enabling safe validation and automatic addition of a LIMIT 1 clause for preview queries.

3.4 Dynamic Rate Limiting

Single‑node rate limiting is implemented with Guava's RateLimiter . The QPS value can be modified on the UI, persisted to the database, and a scheduled task syncs the value to the limiter.

3.5 Retry Strategy and Fault Awareness

Failures are categorized (network jitter, business logic errors, upstream service overload). A Fibonacci back‑off retry strategy is applied using Guava Retry:

// Retry component configuration
private final Retryer
RETRYER = RetryerBuilder.
newBuilder()
        .retryIfException(input -> !isPauseException(input))
        .withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
        .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
        .withRetryListener(new RetryListener() {
            @Override
            public
void onRetry(Attempt
attempt) {
                if (attempt.hasException()) {
                    log.error("act=【DataFlushRpcCallRetry】desc=【重试】重试次数=【{}】重试异常=【{}】",
                              attempt.getAttemptNumber(), attempt.getExceptionCause());
                    alarmIfExceedThreshold(attempt);
                }
            }
        })
        .build();

public void execute(Runnable runnable) {
    innerExecute(runnable, RETRYER);
}

If retries exceed the threshold, an alert is sent to a designated group.

3.6 SPI Mechanism for Target Service Selection

ECP defines a generic interface for ES indexing. Consumers implement the interface, and ECP dynamically references the appropriate service via an SPI‑style lookup:

Reference
reference = new Reference<>();
reference.setServiceName(serviceName);
reference.setInterfaceClass(IEsIndexFlushAPI.class);
reference.setApplicationConfig(applicationConfig);
IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName,
        s -> reference.refer());
log.info("act=【EsIndexFlushApiInvoker】desc=【请求值】serviceName=【{}】dataListSize=【{}】indexNameList=【{}】tag=【{}】",
        serviceName, request.getDataList().size(), request.getIndexNameList(), request.getTag());
EMApiResult
result = iEsIndexFlushAPI.flush(request);

3.7 Environment Isolation

ECP integrates tag‑based routing so that sync tasks can be isolated from production services. Tasks tagged with FLUSH are routed to services belonging to the same tag group.

3.8 Health Check and Fault Recovery

A keep‑alive component periodically renews task timestamps; a recovery component scans unfinished tasks and restarts those whose last heartbeat exceeds a threshold.

Keep‑alive pseudo‑code:

@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
public void renewal() {
    futureMap.forEach((taskId, future) -> {
        if (!future.isDone()) {
            log.info("act=【renewal】desc=【任务续期】taskId=【{}】续期时间=【{}】", taskId, DateUtils.dateToString(new Date(), DateUtils.PATTERN));
            contextService.renewal(taskId);
        } else {
            log.info("act=【renewal】desc=【任务结束】taskId=【{}】", taskId);
            futureMap.remove(taskId);
        }
    });
}

Recovery pseudo‑code:

@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
public void restartTask() {
    List
contextPOS = contextService.queryRunningTask();
    for (TaskFlushExecuteContextPO contextPO : contextPOS) {
        Integer durationMin = calculateTimeSinceLastAlive();
        if (durationMin >= MAX_DURATION_MIN) {
            log.info("act=【restartTask】desc=【任务重新拉起】taskId=【{}】", contextPO.getTaskId());
            int i = contextExtMapper.casUpdateAliveTime();
            if (i > 0) {
                restart0(contextPO, aliveTime);
            }
        }
    }
}

3.9 Smooth Migration

ECP supports both direct index writes and blue‑green migration using a new index, dual‑write, and alias switching to achieve zero‑downtime updates.

3.10 Elegant Logging

Logging is decoupled from business logic using annotation‑driven flow recording. Dynamic values (e.g., task ID) are injected via Spring EL expressions:

@Flow(subjectIdEp = "#taskPO.id", subjectType = SubjectTypeEnum.TASK,
      operateFlowType = OperateFlowTypeEnum.CREATE_TASK,
      content = "'创建任务,任务ID:' + #taskPO.id")
public void saveTaskWithUser(TaskPO taskPO) {
    String name = LoginUserContext.get().getName();
    taskPO.setCreator(name);
    taskPO.setModifier(name);
    taskMapper.insertSelective(taskPO);
}

4 Summary

The ECP implementation demonstrates a comprehensive backend solution for large‑scale data synchronization to Elasticsearch, covering data source heterogeneity, pagination, rate limiting, retry, SPI extensibility, environment isolation, health monitoring, smooth migration, and clean logging.

Javamicroservicesbackend-developmentElasticsearchdata synchronizationretry strategyDynamic Rate Limiting
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.