How We Built a Scalable Data Migration Framework for Billions of Transactions
This article details the design and implementation of a custom, high‑throughput data migration framework that handles petabyte‑scale transaction data, supports heterogeneous source/target schemas, ensures zero‑downtime operation, and provides robust scheduling, checkpointing, and fault‑tolerance mechanisms.
Background
In the past year the transaction system's architecture was upgraded, including order, shipment, and billing model upgrades. Data migration was a critical part, facing challenges: massive data volume (hundreds of billions of records), heterogeneous source/target schemas, and the need for stable operation while business runs in parallel.
Migration Tool Selection
Common Tools
Sqoop : Apache open‑source tool for transferring data between relational databases and Hadoop, mainly for RDBMS‑Hadoop migrations.
yugong : Alibaba's tool for Oracle‑to‑MySQL migration, suitable for single‑table moves with limited extensibility.
DataX : Alibaba open‑source offline sync tool supporting many heterogeneous sources (MySQL, Oracle, PostgreSQL, HDFS, Hive, HBase) for batch synchronization.
Current Situation
Table sharding : transaction tables are horizontally partitioned, so migration must handle many physical shards.
Custom model conversion : complex data model transformations require custom code.
Distributed task scheduling : large data volume requires splitting jobs across many machines.
Flexible flow control : long migration cycles need extensible, interruptible, and resumable processes.
Task orchestration : tasks must be ordered when migrating multiple time windows.
Stability guarantees : need throttling, degradation, circuit‑break, retry and alert mechanisms.
Tool Evaluation Result
Existing tools cannot fully satisfy the transaction migration requirements, so a custom framework was built.
Custom Migration Framework
Architecture Diagram
Roles
Supervisor : a JVM instance responsible for job generation, splitting, dispatching, orchestration and management. Identified by ip:port.
Worker : a JVM instance that executes tasks and reports back to the Supervisor. Identified by group:uuid:ip:port.
Registry Center
Supervisor and Worker discover each other via a Redis‑based registry using ZSET for registration/discovery and Pub/Sub for online/offline notifications.
register : periodic heartbeat updating score to current timestamp.
discovery : periodically removes expired entries with zremrangebyscore.
pub/sub : notifies peers of role status changes.
Core Workflow
3.1 Create Job
Jobs define task configuration and can be created via OpenAPI. Example configuration:
{
"jobGroup": "test",
"jobName": "XXX Data Migration [20210101~20210331]",
"jobHandler": "xxx.xxxxxx.xxx.xxx.xxx.DataMigrateJobHandler",
"jobParam": "{\"batch_size\":200,\"parallelism\":50,\"sharding_number\":1024,\"from_create_time\":\"2021-01-01\",\"to_create_time\":\"2021-03-31\"}",
"triggerType": 2,
"triggerConf": "2022-08-21 21:40:00",
"routeStrategy": 1
}jobHandler : user‑provided task processor.
triggerType : 1‑Cron, 2‑specific time, 3‑dependency.
routeStrategy : 1‑Round‑Robin, 2‑Random, 3‑Consistent‑Hash, 4‑Local‑First.
3.2 Generate Tasks
Supervisor scans the job table each minute, creates instances and splits them into multiple tasks.
3.3 Dispatch Tasks
Tasks are dispatched to Workers according to the routing strategy using a Redis List. If a Worker crashes, the Supervisor re‑dispatches pending tasks.
worker_key : tasks.dispatch.group:uuid:ip:port dispatch :
redis.rPush({worker_key}, {task_data})3.4 Receive Tasks
Workers poll the Redis list, place tasks into a timing wheel, and when the trigger time arrives, submit them to a custom thread pool WorkerThreadPool for execution by WorkerThread.
local ret = redis.call('lrange', {worker_key}, 0, {batch_size}-1);
redis.call('ltrim', {worker_key}, {batch_size}, -1);
return ret;3.5 Execute Tasks
WorkerThreadcreates the user‑defined JobHandler and calls its execute method, which contains the business logic.
3.6 Checkpoint
During execution, checkpoints can be saved to allow pause, restart, or recovery after failures.
3.7 Pause/Resume
Clients can pause running tasks via OpenAPI, optionally saving a checkpoint, and later resume from the saved state.
3.8 Rate Limiting
Rate limiting is applied based on DB resources, business peak/off‑peak, exceptions, and manual testing. Configuration example:
{
"flow_limit":{
"07": 100,
"12": 50, // after noon, limit each POD to 50 orders per second
"20": 200,
"22": 500,
"00": 700
}
}Implemented with Guava’s token‑bucket algorithm, with fallback to a leaky‑bucket approach.
3.9 Error‑Rate Circuit Breaker
A BitSet records the success/failure of recent N records; if the error rate exceeds a threshold, the task is automatically paused.
Migration Practice
Migration Process
Source data scan : scan physical shards based on time range or business type.
Data model conversion : transform source structures to target structures.
Target persistence : write transformed data to the destination database.
Result verification : perform data quality checks after persistence.
Migration Assurance
Failure Retry
On exception, a retry is attempted; if it fails repeatedly, the task is logged and alerts are triggered. Duplicate‑key insert syntax ensures idempotent migration:
INSERT INTO target_table (...) VALUES (...) ON DUPLICATE KEY UPDATE ...Data Verification
Pre‑migration offline check using dual‑write consistency.
Real‑time check during migration by comparing source and target via business APIs.
Post‑migration offline check using data warehouse.
Post‑migration traffic replay to validate query results.
Migration Control
Task interruption and recovery.
Dynamic rate adjustment based on performance metrics.
Error‑rate threshold circuit‑break.
Migration Strategy
Test data: migrate full data of a test region.
Early data: migrate first 10% of oldest six months for gray‑scale verification.
Recent data: migrate first 10% of newest six months.
Full data: migrate all data in chronological batches.
Degradation Plan
Mark‑based downgrade: set double‑write flag to false for affected orders.
Delete‑based downgrade: delete new‑library order data via API.
Conclusion and Outlook
Through this large‑scale migration project we have built a generic, efficient, and reliable migration toolset and methodology. Future work will focus on platform‑izing the migration framework so that different teams can configure and execute migrations without custom code.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
