Building a Scalable Order‑to‑Elasticsearch Sync Service (ECP)

This article explains the challenges of synchronizing billions of order records to Elasticsearch, outlines the design of the ECP service that automates heterogeneous data integration, and details its implementation including multi‑source reading, SQL parsing, dynamic rate limiting, retry mechanisms, SPI‑based extensibility, environment isolation, health‑check, smooth migration, and logging.

dbaplus Community
dbaplus Community
dbaplus Community
Building a Scalable Order‑to‑Elasticsearch Sync Service (ECP)

Background

Faced with a requirement to support arbitrary multi‑condition queries on over a billion orders, the team needed to sync order data to Elasticsearch (ES) to leverage its inverted index and caching capabilities. Existing ES clusters were available, but some data was missing from current ES indexes, necessitating a full re‑sync that could take a week.

Complexity of Order Data Sync to ES

The sync process involves several challenges:

Extracting billions of order rows without impacting the production database, requiring read‑only replicas and sharding across 16 databases and 256 tables.

Paging through massive data sets efficiently, using a unique, incremental cursor instead of simple LIMIT offsets.

Fetching related user and product information from upstream services, which introduces cross‑service load and the need for isolation and rate limiting.

Providing progress visibility, handling partial failures, and supporting pause/resume capabilities.

Allowing selective sync of specific orders or IDs.

Recording execution metrics such as start/end times, duration, and operator.

Scheduling syncs during low‑traffic windows.

Extending the sync to additional data domains like product data.

The ECP Service

ECP (Elastic‑Cache‑Process) automates and visualizes the entire sync workflow, reducing the cost of heterogeneous data integration into ES.

Simple Run Flow

ECP reads data from various sources and pushes it to an ES write service, which is implemented by each consumer.

Multi‑Data‑Source Reading

ECP supports three source types:

ID Source : A text box for entering IDs, suitable for small‑scale syncs.

File Source : Reads data from uploaded text files in object storage using streaming to avoid OOM.

Script Source : Executes configured SQL against a database, enabling billion‑row reads via JDBC cursor queries.

try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
    if (!response.isSuccessful()) {
        throw new IOException("Unexpected code " + response);
    }
    // Stream file data
    InputStream inputStream = response.body().byteStream();
    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
}

SQL Parsing and Validation

ECP uses Druid to parse SQL into an AST for safe manipulation and automatic LIMIT 1 testing.

Dynamic Rate Limiting

Implemented with Guava RateLimiter on a single‑node basis. QPS values are stored in a database and refreshed by a scheduled task.

RateLimiter limiter = RateLimiter.create(qps);
// Update limiter when QPS changes

Retry Strategy and Fault Awareness

Failures are categorized (network jitter, business logic errors, upstream service issues) and handled with a Fibonacci back‑off retry using Guava Retry.

private final Retryer<Boolean> RETRYER = RetryerBuilder.<Boolean>newBuilder()
    .retryIfException(input -> !isPauseException(input))
    .withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
    .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
    .withRetryListener(new RetryListener() { ... })
    .build();

SPI Mechanism for Extensible Push Services

ECP defines a generic interface for ES write services; each downstream system implements the interface, allowing new consumers to be added without modifying ECP.

Reference<IEsIndexFlushAPI> reference = new Reference<>();
reference.setServiceName(serviceName);
reference.setInterfaceClass(IEsIndexFlushAPI.class);
IEsIndexFlushAPI api = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer());
api.flush(request);

Environment Isolation

Tasks are tagged (e.g., FLUSH) so that sync jobs run against isolated service groups, preventing impact on production workloads.

Health Check and Task Recovery

ECP includes a keep‑alive component that renews task timestamps and a restart component that rescues tasks whose last heartbeat exceeds a threshold.

@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
public void renewal() { ... }

@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
public void restartTask() { ... }

Smooth Migration

Two migration strategies are supported: direct write to the existing index or creating a new index with alias switching for zero‑downtime updates.

Elegant Logging

Annotation‑based flow logging captures dynamic context (task ID, data source) using Spring EL expressions.

@Flow(subjectIdEp = "#taskPO.id", subjectType = SubjectTypeEnum.TASK,
      operateFlowType = OperateFlowTypeEnum.CREATE_TASK,
      content = "'创建任务,任务ID:' + #taskPO.id")
public void saveTaskWithUser(TaskPO taskPO) { ... }

Conclusion

ECP demonstrates a moderately complex backend solution that addresses data heterogeneity, large‑scale paging, rate limiting, retry, extensibility via SPI, environment isolation, fault recovery, and operational logging, highlighting the importance of meticulous detail handling in large‑scale data sync projects.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaElasticsearchSPIdata-sync
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.