How to Seamlessly Migrate MongoDB to an Isolated Instance with Real-Time Sync
This guide details a step‑by‑step process for migrating a high‑traffic MongoDB app to a dedicated instance, covering full backup, real‑time incremental sync via Change Streams, compensating for missed changes, handling operation ordering, and ensuring zero downtime throughout.
Background
Company's business line app daily active users are increasing, core logic is complex, and peak periods consume too much database performance, affecting other services, so migration to an independent MongoDB instance is decided.
Goal
Data integrity: both original and incremental data must be migrated.
Migration continuity: migration must not affect business services.
Order consistency: incremental operations (insert/update/delete) must be executed in oplog order.
Core Process
Data synchronization mechanism: MongoDB Change Stream (based on replica set oplog) → full backup → real‑time incremental sync → incremental compensating sync.
┌─────────────────────┐
│ 1. Record full migration start time │ backupStartTime
└─────────────────────┘
│
▼
┌─────────────────────┐
│ 2. Execute mongodump │ Export full data
└─────────────────────┘
│
▼
┌─────────────────────┐
│ 3. Execute mongorestore │ Import into target DB
└─────────────────────┘
│
▼
┌─────────────────────┐
│ 4. Real‑time incremental sync │ Continuously listen to latest changes, ensure data sync after migration
└─────────────────────┘
│
▼
┌─────────────────────┐
│ 5. Compensating incremental sync │ change stream + time range (backup start‑to‑restore complete) + ordered processing
└─────────────────────┘Full Migration
Record start time T1.
Run mongodump with appropriate options:
mongodump \
--host localhost \
--port 27017 \
--db dbName \
--archive=/mnt/backup/dbName.gz \
--gzipCopy the backup file to the target host with scp.
Run mongorestore on the target:
mongorestore \
--host localhost \
--port 27017 \
--archive=path/to/dbName.gz \
--gzipRecord end time T2.
Real‑time Incremental Sync
After the backup is restored, a Change Stream is opened to continuously listen for new insert, update, and delete operations and apply them to the target database.
Operation types:
insert: updateOne + $setOnInsert (idempotent write)
update: replaceOne + upsert delete:
deleteOne const { MongoClient, Timestamp } = require('mongodb');
const { argv } = require('node:process');
// source replica set
const sourceUri = 'mongodb://host:27017/?replicaSet=rs0';
const sourceDbName = 'dnName';
// target instance
const targetUri = 'mongodb://targetHost:27017/?replicaSet=rs0';
const targetDbName = 'dbName';
const sourceClient = new MongoClient(sourceUri);
const targetClient = new MongoClient(targetUri);
async function main() {
let insertCount = 0;
let updateCount = 0;
let deleteCount = 0;
try {
await sourceClient.connect();
await targetClient.connect();
const sourceDb = sourceClient.db(sourceDbName);
const targetDb = targetClient.db(targetDbName);
console.error(`✅ Connected source DB: ${sourceDbName}`);
console.error(`✅ Connected target DB: ${targetDbName}`);
console.error('📅 Starting real‑time change listening');
const changeStream = sourceDb.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', async (change) => {
const { operationType, ns, fullDocument, documentKey } = change;
const targetCollection = targetDb.collection(ns.coll);
try {
switch (operationType) {
case 'insert':
await targetCollection.updateOne(
{ _id: fullDocument._id },
{ $setOnInsert: fullDocument },
{ upsert: true }
);
insertCount++;
console.error('>>>', operationType, ns.coll, documentKey, `; total inserted: ${insertCount}`);
break;
case 'update':
case 'replace':
await targetCollection.replaceOne({ _id: documentKey._id }, fullDocument, { upsert: true });
updateCount++;
console.error('>>>', operationType, ns.coll, documentKey, `; total updated: ${updateCount}`);
break;
case 'delete':
await targetCollection.deleteOne({ _id: documentKey._id });
deleteCount++;
console.error('>>>', operationType, ns.coll, documentKey, `; total deleted: ${deleteCount}`);
break;
}
} catch (err) {
console.error(`❌ Sync ${operationType} failed: ${err.message}`);
}
});
changeStream.on('error', (err) => {
console.error('🚨 ChangeStream error:', err);
});
console.error('🚀 Change Stream started, listening...');
await new Promise(() => {}); // keep process alive
} catch (err) {
console.error(`💥 Initialization failed: ${err.message}`);
}
}
main();Compensating Incremental Sync
To cover incremental data missed during full backup/restore, specify a time range (the T1 and T2 timestamps recorded in the full migration) and read all change‑stream events within that range, cache them, sort by clusterTime in ascending order, and then apply them.
/**
* Change stream compensating mode
* Synchronize change events within a specified time range in order
*/
const { MongoClient, Timestamp } = require('mongodb');
const { argv } = require('node:process');
// source replica set
const sourceUri = 'mongodb://host:27017/?replicaSet=rs0';
const sourceDbName = 'dnName';
// target instance
const targetUri = 'mongodb://targetHost:27017/?replicaSet=rs0';
const targetDbName = 'dbName';
const sourceClient = new MongoClient(sourceUri);
const targetClient = new MongoClient(targetUri);
async function main() {
const startAtTime = Number(argv[2]);
const endTime = Number(argv[3]);
if (!startAtTime || !endTime) {
console.error('❌ Compensating mode requires startAtTime and endTime');
process.exit(1);
}
console.error(`🟠 Compensating mode: sync time range [${new Date(startAtTime * 1000).toISOString()}, ${new Date(endTime * 1000).toISOString()}]`);
try {
await sourceClient.connect();
await targetClient.connect();
const sourceDb = sourceClient.db(sourceDbName);
const targetDb = targetClient.db(targetDbName);
console.error(`✅ Connected source DB: ${sourceDbName}`);
console.error(`✅ Connected target DB: ${targetDbName}`);
const changeStream = sourceDb.watch([], {
fullDocument: 'updateLookup',
startAtOperationTime: new Timestamp({ t: startAtTime, i: 0 })
});
const changes = [];
changeStream.on('change', (change) => {
const tsSec = change.clusterTime.getHighBits();
if (tsSec <= endTime) changes.push(change);
});
await new Promise((resolve) => {
changeStream.on('close', resolve);
});
console.error(`🟢 Collected ${changes.length} events, starting sort`);
changes.sort((a, b) => a.clusterTime.getHighBits() - b.clusterTime.getHighBits());
let insertCount = 0, updateCount = 0, deleteCount = 0;
for (const change of changes) {
const { operationType, ns, fullDocument, documentKey } = change;
const targetCollection = targetDb.collection(ns.coll);
try {
switch (operationType) {
case 'insert':
await targetCollection.updateOne({ _id: fullDocument._id }, { $setOnInsert: fullDocument }, { upsert: true });
insertCount++;
break;
case 'update':
case 'replace':
await targetCollection.replaceOne({ _id: documentKey._id }, fullDocument, { upsert: true });
updateCount++;
break;
case 'delete':
await targetCollection.deleteOne({ _id: documentKey._id });
deleteCount++;
break;
}
} catch (err) {
console.error(`❌ Sync ${operationType} failed: ${err.message}`);
}
}
console.error(`✅ Sync completed: insert=${insertCount}, update=${updateCount}, delete=${deleteCount}`);
await sourceClient.close();
await targetClient.close();
} catch (err) {
console.error(`💥 Initialization failed: ${err.message}`);
}
}
main();Run the script with node sync.js T1 T2 where T1 and T2 are the start and end timestamps.
Switch DB Connection
After verifying that data changes for each collection are synchronized correctly, switch the DB connection URI and deploy; after going live, read the source DB change stream again to ensure new changes are captured during rolling updates.
About Change Stream Disorder
Phenomenon
Source DB: insert then delete. Change Stream listener processes: delete then insert.
Reason
Oplog propagation delay: primary writes to oplog first, then replicates to secondaries; listeners on different nodes may receive events at slightly different times, causing out‑of‑order delivery.
Change Stream batch returns: events are returned in batches; order within a batch is usually correct, but across batches the order can interleave when the event rate is high.
Timestamp and network latency: clusterTime marks event order, but network transmission and queueing can cause callback order to differ from clusterTime. The script processes change events directly without caching and sorting, which may lead to disorder.
Java Captain
Focused on Java technologies: SSM, the Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading; occasionally covers DevOps tools like Jenkins, Nexus, Docker, ELK; shares practical tech insights and is dedicated to full‑stack Java development.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
