Design and Implementation of an Elasticsearch Data Synchronization Service (ECP)
This article describes the challenges of synchronizing billions of order records to Elasticsearch and presents the design, architecture, and key technical details of a generic data‑sync service (ECP) that supports multiple data sources, dynamic rate limiting, retry strategies, SPI‑based extensibility, environment isolation, health‑check, fault recovery, smooth migration, and elegant logging.
1 Background
Faced with a requirement to provide a flexible multi‑condition order query over more than a hundred million records, the team realized that the solution is to synchronize heterogeneous order data into Elasticsearch (ES) and leverage its inverted index and caching capabilities for complex queries.
However, some data does not exist in the current ES index, so the entire order dataset must be re‑indexed, which would take about a week if done naïvely.
2 Complexity of Order Data Synchronization to ES
2.1 Data Sync Process
The process involves reading all orders from the order database, fetching related user and product information from upstream services, assembling the enriched records, and writing them to the ES cluster.
2.2 Pain Points
Extracting billions of rows without impacting the primary database; a read‑only replica and multiple data sources are required.
Paging large datasets efficiently; using a unique, incremental cursor field instead of simple LIMIT‑OFFSET.
Calling multiple upstream services without overloading them; rate‑limiting and isolation are necessary.
Monitoring sync progress and providing real‑time status to product owners.
Handling failures with retry, alerting, and the ability to resume from the last successful point.
Supporting manual pause/resume and partial sync of inconsistent orders.
Allowing ad‑hoc sync of specific order IDs.
Collecting execution statistics such as start/end time, duration, and operator.
Scheduling syncs during off‑peak hours.
Extending the solution to synchronize other domains such as product data.
3 The "Magical" Service – ECP
ECP (ElasticSearch Control Platform) automates and visualizes the entire sync workflow, reducing the cost of data heterogeneity.
3.1 Simple Run Flow
ECP reads data from a source and pushes it to an ES write service. The write service is implemented by each consumer according to its business logic.
3.2 Multi‑Data‑Source Reading
ECP currently supports three source types:
ID Source : a text box for entering a single ID, suitable for small‑scale syncs.
File Source : uploads a text file to object storage; ECP streams the file line‑by‑line to avoid OOM.
Script Source : executes a user‑provided SQL against a configured database, enabling large‑scale reads.
File streaming example:
try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
// Stream file data
InputStream inputStream = response.body().byteStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
}For massive data, ECP uses JDBC cursor queries instead of LIMIT‑OFFSET:
// Establish connection
conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
// Create forward‑only, read‑only statement
stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(param.getFetchSize());Cursor fetching avoids OOM by reading a configurable batch size.
3.3 SQL Parsing and Validation
ECP uses Druid to parse SQL into an AST, enabling safe validation and automatic addition of a LIMIT 1 clause for preview queries.
3.4 Dynamic Rate Limiting
Single‑node rate limiting is implemented with Guava's RateLimiter . The QPS value can be modified on the UI, persisted to the database, and a scheduled task syncs the value to the limiter.
3.5 Retry Strategy and Fault Awareness
Failures are categorized (network jitter, business logic errors, upstream service overload). A Fibonacci back‑off retry strategy is applied using Guava Retry:
// Retry component configuration
private final Retryer
RETRYER = RetryerBuilder.
newBuilder()
.retryIfException(input -> !isPauseException(input))
.withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
.withRetryListener(new RetryListener() {
@Override
public
void onRetry(Attempt
attempt) {
if (attempt.hasException()) {
log.error("act=【DataFlushRpcCallRetry】desc=【重试】重试次数=【{}】重试异常=【{}】",
attempt.getAttemptNumber(), attempt.getExceptionCause());
alarmIfExceedThreshold(attempt);
}
}
})
.build();
public void execute(Runnable runnable) {
innerExecute(runnable, RETRYER);
}If retries exceed the threshold, an alert is sent to a designated group.
3.6 SPI Mechanism for Target Service Selection
ECP defines a generic interface for ES indexing. Consumers implement the interface, and ECP dynamically references the appropriate service via an SPI‑style lookup:
Reference
reference = new Reference<>();
reference.setServiceName(serviceName);
reference.setInterfaceClass(IEsIndexFlushAPI.class);
reference.setApplicationConfig(applicationConfig);
IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName,
s -> reference.refer());
log.info("act=【EsIndexFlushApiInvoker】desc=【请求值】serviceName=【{}】dataListSize=【{}】indexNameList=【{}】tag=【{}】",
serviceName, request.getDataList().size(), request.getIndexNameList(), request.getTag());
EMApiResult
result = iEsIndexFlushAPI.flush(request);3.7 Environment Isolation
ECP integrates tag‑based routing so that sync tasks can be isolated from production services. Tasks tagged with FLUSH are routed to services belonging to the same tag group.
3.8 Health Check and Fault Recovery
A keep‑alive component periodically renews task timestamps; a recovery component scans unfinished tasks and restarts those whose last heartbeat exceeds a threshold.
Keep‑alive pseudo‑code:
@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
public void renewal() {
futureMap.forEach((taskId, future) -> {
if (!future.isDone()) {
log.info("act=【renewal】desc=【任务续期】taskId=【{}】续期时间=【{}】", taskId, DateUtils.dateToString(new Date(), DateUtils.PATTERN));
contextService.renewal(taskId);
} else {
log.info("act=【renewal】desc=【任务结束】taskId=【{}】", taskId);
futureMap.remove(taskId);
}
});
}Recovery pseudo‑code:
@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
public void restartTask() {
List
contextPOS = contextService.queryRunningTask();
for (TaskFlushExecuteContextPO contextPO : contextPOS) {
Integer durationMin = calculateTimeSinceLastAlive();
if (durationMin >= MAX_DURATION_MIN) {
log.info("act=【restartTask】desc=【任务重新拉起】taskId=【{}】", contextPO.getTaskId());
int i = contextExtMapper.casUpdateAliveTime();
if (i > 0) {
restart0(contextPO, aliveTime);
}
}
}
}3.9 Smooth Migration
ECP supports both direct index writes and blue‑green migration using a new index, dual‑write, and alias switching to achieve zero‑downtime updates.
3.10 Elegant Logging
Logging is decoupled from business logic using annotation‑driven flow recording. Dynamic values (e.g., task ID) are injected via Spring EL expressions:
@Flow(subjectIdEp = "#taskPO.id", subjectType = SubjectTypeEnum.TASK,
operateFlowType = OperateFlowTypeEnum.CREATE_TASK,
content = "'创建任务,任务ID:' + #taskPO.id")
public void saveTaskWithUser(TaskPO taskPO) {
String name = LoginUserContext.get().getName();
taskPO.setCreator(name);
taskPO.setModifier(name);
taskMapper.insertSelective(taskPO);
}4 Summary
The ECP implementation demonstrates a comprehensive backend solution for large‑scale data synchronization to Elasticsearch, covering data source heterogeneity, pagination, rate limiting, retry, SPI extensibility, environment isolation, health monitoring, smooth migration, and clean logging.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.