Building a Scalable Order‑to‑Elasticsearch Sync Service (ECP)
This article explains the challenges of synchronizing billions of order records to Elasticsearch, outlines the design of the ECP service that automates heterogeneous data integration, and details its implementation including multi‑source reading, SQL parsing, dynamic rate limiting, retry mechanisms, SPI‑based extensibility, environment isolation, health‑check, smooth migration, and logging.
Background
Faced with a requirement to support arbitrary multi‑condition queries on over a billion orders, the team needed to sync order data to Elasticsearch (ES) to leverage its inverted index and caching capabilities. Existing ES clusters were available, but some data was missing from current ES indexes, necessitating a full re‑sync that could take a week.
Complexity of Order Data Sync to ES
The sync process involves several challenges:
Extracting billions of order rows without impacting the production database, requiring read‑only replicas and sharding across 16 databases and 256 tables.
Paging through massive data sets efficiently, using a unique, incremental cursor instead of simple LIMIT offsets.
Fetching related user and product information from upstream services, which introduces cross‑service load and the need for isolation and rate limiting.
Providing progress visibility, handling partial failures, and supporting pause/resume capabilities.
Allowing selective sync of specific orders or IDs.
Recording execution metrics such as start/end times, duration, and operator.
Scheduling syncs during low‑traffic windows.
Extending the sync to additional data domains like product data.
The ECP Service
ECP (Elastic‑Cache‑Process) automates and visualizes the entire sync workflow, reducing the cost of heterogeneous data integration into ES.
Simple Run Flow
ECP reads data from various sources and pushes it to an ES write service, which is implemented by each consumer.
Multi‑Data‑Source Reading
ECP supports three source types:
ID Source : A text box for entering IDs, suitable for small‑scale syncs.
File Source : Reads data from uploaded text files in object storage using streaming to avoid OOM.
Script Source : Executes configured SQL against a database, enabling billion‑row reads via JDBC cursor queries.
try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
// Stream file data
InputStream inputStream = response.body().byteStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
}SQL Parsing and Validation
ECP uses Druid to parse SQL into an AST for safe manipulation and automatic LIMIT 1 testing.
Dynamic Rate Limiting
Implemented with Guava RateLimiter on a single‑node basis. QPS values are stored in a database and refreshed by a scheduled task.
RateLimiter limiter = RateLimiter.create(qps);
// Update limiter when QPS changesRetry Strategy and Fault Awareness
Failures are categorized (network jitter, business logic errors, upstream service issues) and handled with a Fibonacci back‑off retry using Guava Retry.
private final Retryer<Boolean> RETRYER = RetryerBuilder.<Boolean>newBuilder()
.retryIfException(input -> !isPauseException(input))
.withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
.withRetryListener(new RetryListener() { ... })
.build();SPI Mechanism for Extensible Push Services
ECP defines a generic interface for ES write services; each downstream system implements the interface, allowing new consumers to be added without modifying ECP.
Reference<IEsIndexFlushAPI> reference = new Reference<>();
reference.setServiceName(serviceName);
reference.setInterfaceClass(IEsIndexFlushAPI.class);
IEsIndexFlushAPI api = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer());
api.flush(request);Environment Isolation
Tasks are tagged (e.g., FLUSH) so that sync jobs run against isolated service groups, preventing impact on production workloads.
Health Check and Task Recovery
ECP includes a keep‑alive component that renews task timestamps and a restart component that rescues tasks whose last heartbeat exceeds a threshold.
@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
public void renewal() { ... }
@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
public void restartTask() { ... }Smooth Migration
Two migration strategies are supported: direct write to the existing index or creating a new index with alias switching for zero‑downtime updates.
Elegant Logging
Annotation‑based flow logging captures dynamic context (task ID, data source) using Spring EL expressions.
@Flow(subjectIdEp = "#taskPO.id", subjectType = SubjectTypeEnum.TASK,
operateFlowType = OperateFlowTypeEnum.CREATE_TASK,
content = "'创建任务,任务ID:' + #taskPO.id")
public void saveTaskWithUser(TaskPO taskPO) { ... }Conclusion
ECP demonstrates a moderately complex backend solution that addresses data heterogeneity, large‑scale paging, rate limiting, retry, extensibility via SPI, environment isolation, fault recovery, and operational logging, highlighting the importance of meticulous detail handling in large‑scale data sync projects.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
