Databases 11 min read

How to Seamlessly Migrate MongoDB to an Isolated Instance with Real-Time Sync

This guide details a step‑by‑step process for migrating a high‑traffic MongoDB app to a dedicated instance, covering full backup, real‑time incremental sync via Change Streams, compensating for missed changes, handling operation ordering, and ensuring zero downtime throughout.

Java Captain
Java Captain
Java Captain
How to Seamlessly Migrate MongoDB to an Isolated Instance with Real-Time Sync

Background

Company's business line app daily active users are increasing, core logic is complex, and peak periods consume too much database performance, affecting other services, so migration to an independent MongoDB instance is decided.

Goal

Data integrity: both original and incremental data must be migrated.

Migration continuity: migration must not affect business services.

Order consistency: incremental operations (insert/update/delete) must be executed in oplog order.

Core Process

Data synchronization mechanism: MongoDB Change Stream (based on replica set oplog) → full backup → real‑time incremental sync → incremental compensating sync.

┌─────────────────────┐
│ 1. Record full migration start time │ backupStartTime
└─────────────────────┘
          │
          ▼
┌─────────────────────┐
│ 2. Execute mongodump    │ Export full data
└─────────────────────┘
          │
          ▼
┌─────────────────────┐
│ 3. Execute mongorestore │ Import into target DB
└─────────────────────┘
          │
          ▼
┌─────────────────────┐
│ 4. Real‑time incremental sync │ Continuously listen to latest changes, ensure data sync after migration
└─────────────────────┘
          │
          ▼
┌─────────────────────┐
│ 5. Compensating incremental sync │ change stream + time range (backup start‑to‑restore complete) + ordered processing
└─────────────────────┘

Full Migration

Record start time T1.

Run mongodump with appropriate options:

mongodump \
--host localhost \
--port 27017 \
--db dbName \
--archive=/mnt/backup/dbName.gz \
--gzip

Copy the backup file to the target host with scp.

Run mongorestore on the target:

mongorestore \
--host localhost \
--port 27017 \
--archive=path/to/dbName.gz \
--gzip

Record end time T2.

Real‑time Incremental Sync

After the backup is restored, a Change Stream is opened to continuously listen for new insert, update, and delete operations and apply them to the target database.

Operation types:

insert: updateOne + $setOnInsert (idempotent write)

update: replaceOne + upsert delete:

deleteOne
const { MongoClient, Timestamp } = require('mongodb');
const { argv } = require('node:process');

// source replica set
const sourceUri = 'mongodb://host:27017/?replicaSet=rs0';
const sourceDbName = 'dnName';

// target instance
const targetUri = 'mongodb://targetHost:27017/?replicaSet=rs0';
const targetDbName = 'dbName';

const sourceClient = new MongoClient(sourceUri);
const targetClient = new MongoClient(targetUri);

async function main() {
  let insertCount = 0;
  let updateCount = 0;
  let deleteCount = 0;

  try {
    await sourceClient.connect();
    await targetClient.connect();

    const sourceDb = sourceClient.db(sourceDbName);
    const targetDb = targetClient.db(targetDbName);

    console.error(`✅ Connected source DB: ${sourceDbName}`);
    console.error(`✅ Connected target DB: ${targetDbName}`);
    console.error('📅 Starting real‑time change listening');

    const changeStream = sourceDb.watch([], { fullDocument: 'updateLookup' });

    changeStream.on('change', async (change) => {
      const { operationType, ns, fullDocument, documentKey } = change;
      const targetCollection = targetDb.collection(ns.coll);
      try {
        switch (operationType) {
          case 'insert':
            await targetCollection.updateOne(
              { _id: fullDocument._id },
              { $setOnInsert: fullDocument },
              { upsert: true }
            );
            insertCount++;
            console.error('>>>', operationType, ns.coll, documentKey, `; total inserted: ${insertCount}`);
            break;
          case 'update':
          case 'replace':
            await targetCollection.replaceOne({ _id: documentKey._id }, fullDocument, { upsert: true });
            updateCount++;
            console.error('>>>', operationType, ns.coll, documentKey, `; total updated: ${updateCount}`);
            break;
          case 'delete':
            await targetCollection.deleteOne({ _id: documentKey._id });
            deleteCount++;
            console.error('>>>', operationType, ns.coll, documentKey, `; total deleted: ${deleteCount}`);
            break;
        }
      } catch (err) {
        console.error(`❌ Sync ${operationType} failed: ${err.message}`);
      }
    });

    changeStream.on('error', (err) => {
      console.error('🚨 ChangeStream error:', err);
    });

    console.error('🚀 Change Stream started, listening...');
    await new Promise(() => {}); // keep process alive
  } catch (err) {
    console.error(`💥 Initialization failed: ${err.message}`);
  }
}

main();

Compensating Incremental Sync

To cover incremental data missed during full backup/restore, specify a time range (the T1 and T2 timestamps recorded in the full migration) and read all change‑stream events within that range, cache them, sort by clusterTime in ascending order, and then apply them.

/** 
 * Change stream compensating mode
 * Synchronize change events within a specified time range in order
 */
const { MongoClient, Timestamp } = require('mongodb');
const { argv } = require('node:process');

// source replica set
const sourceUri = 'mongodb://host:27017/?replicaSet=rs0';
const sourceDbName = 'dnName';

// target instance
const targetUri = 'mongodb://targetHost:27017/?replicaSet=rs0';
const targetDbName = 'dbName';

const sourceClient = new MongoClient(sourceUri);
const targetClient = new MongoClient(targetUri);

async function main() {
  const startAtTime = Number(argv[2]);
  const endTime = Number(argv[3]);

  if (!startAtTime || !endTime) {
    console.error('❌ Compensating mode requires startAtTime and endTime');
    process.exit(1);
  }

  console.error(`🟠 Compensating mode: sync time range [${new Date(startAtTime * 1000).toISOString()}, ${new Date(endTime * 1000).toISOString()}]`);

  try {
    await sourceClient.connect();
    await targetClient.connect();

    const sourceDb = sourceClient.db(sourceDbName);
    const targetDb = targetClient.db(targetDbName);

    console.error(`✅ Connected source DB: ${sourceDbName}`);
    console.error(`✅ Connected target DB: ${targetDbName}`);

    const changeStream = sourceDb.watch([], {
      fullDocument: 'updateLookup',
      startAtOperationTime: new Timestamp({ t: startAtTime, i: 0 })
    });

    const changes = [];
    changeStream.on('change', (change) => {
      const tsSec = change.clusterTime.getHighBits();
      if (tsSec <= endTime) changes.push(change);
    });

    await new Promise((resolve) => {
      changeStream.on('close', resolve);
    });

    console.error(`🟢 Collected ${changes.length} events, starting sort`);
    changes.sort((a, b) => a.clusterTime.getHighBits() - b.clusterTime.getHighBits());

    let insertCount = 0, updateCount = 0, deleteCount = 0;
    for (const change of changes) {
      const { operationType, ns, fullDocument, documentKey } = change;
      const targetCollection = targetDb.collection(ns.coll);
      try {
        switch (operationType) {
          case 'insert':
            await targetCollection.updateOne({ _id: fullDocument._id }, { $setOnInsert: fullDocument }, { upsert: true });
            insertCount++;
            break;
          case 'update':
          case 'replace':
            await targetCollection.replaceOne({ _id: documentKey._id }, fullDocument, { upsert: true });
            updateCount++;
            break;
          case 'delete':
            await targetCollection.deleteOne({ _id: documentKey._id });
            deleteCount++;
            break;
        }
      } catch (err) {
        console.error(`❌ Sync ${operationType} failed: ${err.message}`);
      }
    }

    console.error(`✅ Sync completed: insert=${insertCount}, update=${updateCount}, delete=${deleteCount}`);
    await sourceClient.close();
    await targetClient.close();
  } catch (err) {
    console.error(`💥 Initialization failed: ${err.message}`);
  }
}

main();

Run the script with node sync.js T1 T2 where T1 and T2 are the start and end timestamps.

Switch DB Connection

After verifying that data changes for each collection are synchronized correctly, switch the DB connection URI and deploy; after going live, read the source DB change stream again to ensure new changes are captured during rolling updates.

About Change Stream Disorder

Phenomenon

Source DB: insert then delete. Change Stream listener processes: delete then insert.

Reason

Oplog propagation delay: primary writes to oplog first, then replicates to secondaries; listeners on different nodes may receive events at slightly different times, causing out‑of‑order delivery.

Change Stream batch returns: events are returned in batches; order within a batch is usually correct, but across batches the order can interleave when the event rate is high.

Timestamp and network latency: clusterTime marks event order, but network transmission and queueing can cause callback order to differ from clusterTime. The script processes change events directly without caching and sorting, which may lead to disorder.

data migrationNode.jsBackupMongoDBChange Streams
Java Captain
Written by

Java Captain

Focused on Java technologies: SSM, the Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading; occasionally covers DevOps tools like Jenkins, Nexus, Docker, ELK; shares practical tech insights and is dedicated to full‑stack Java development.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.