Databases 20 min read

Milvus VTS Deep Dive: Two Write Modes, Sharding, and Migration Best Practices

The article provides a source‑code level analysis of Milvus Vector Transport Service (VTS), detailing its three‑stage architecture, partition‑aware sharding logic, two distinct sink write mechanisms (BufferBatchWriter and BulkWriter), schema conversion rules, error‑handling strategies, performance tuning parameters, and practical configuration examples for efficient vector data migration across various data sources.

Shuge Unlimited
Shuge Unlimited
Shuge Unlimited
Milvus VTS Deep Dive: Two Write Modes, Sharding, and Migration Best Practices

Overall Architecture: Source → Transform → Sink

VTS builds on Apache SeaTunnel’s three‑stage design. The Milvus connector consists of the following class chain:

Source side: MilvusSourceMilvusSourceSplitEnumeratorMilvusSourceReaderMilvusBufferReader Sink side: MilvusSinkMilvusSinkWriter → either MilvusBufferBatchWriter (direct Insert API) or MilvusBulkWriter (Import API)

Source Side – Partition‑Aware Sharding Strategy

The core component is MilvusSourceSplitEnumerator. Its run() method performs three steps:

Enumerate all collections to be migrated.

For each collection call generateSplits() to create splits.

Assign the generated splits to registered readers. generateSplits() first checks whether the collection defines a Partition Key :

boolean hasPartitionKey =
    describeCollectionResp.getCollectionSchema().getFieldSchemaList().stream()
        .anyMatch(CreateCollectionReq.FieldSchema::getIsPartitionKey);

Two split paths are selected based on this flag:

Path A – With Partition Key : the whole collection is treated as a single split and further divided by offset using splitByOffset().

Path B – No Partition Key but multiple partitions : each partition becomes a split; each partition is then divided by offset, yielding a two‑level parallelism. splitByOffset() computes the total row count, divides it by the parallelism parameter, and creates equal‑size splits. If parallelism < 2 a single split is returned (single‑threaded mode).

long numOfEntities = /* result of count(*) */;
long splitSize = Math.max(1, numOfEntities / parallelism);
for (int i = 0; i < parallelism; i++) {
    long offset = i * splitSize;
    if (i == parallelism - 1) {
        newSplit = ...offset(offset).build(); // last split has no limit
    } else {
        newSplit = ...offset(offset).limit(splitSize).build();
    }
}

Recommended parallelism is between 2 and the number of CPU cores. Larger values increase concurrent readers and may trigger Milvus rate limits.

Sink Side – Dual Write Modes

BufferBatchWriter – Direct Insert API

Data is cached in a List<JsonObject> ( milvusDataCache) until the configured batch_size is reached, then the Milvus Insert API is invoked.

public void write(SeaTunnelRow element) {
    JsonObject data = milvusSinkConverter.buildMilvusData(...);
    milvusDataCache.add(data);
    writeCache.incrementAndGet();
    writeCount.incrementAndGet();
    if (writeCache >= batchSize) {
        commit(true);
    }
}

Commit condition: writeCache >= batchSize. When a rate‑limit or oversized‑message error occurs, the writer automatically halves batchSize, sleeps 60 seconds, and recursively retries until the batch size is ≤ 2.

if (e.getMessage().contains("rate limit exceeded") ||
    e.getMessage().contains("received message larger than max")) {
    if (data.size() > 2) {
        this.batchSize = this.batchSize / 2;
        Thread.sleep(60000);
        insertWrite(partitionName, firstHalf);
        insertWrite(partitionName, secondHalf);
    }
}

BulkWriter – Parquet + Import API

Designed for massive migrations. Data is first written to Parquet files, uploaded to object storage (S3‑compatible or Azure Blob), then the Milvus Import API loads the data.

RemoteBulkWriterParam param = RemoteBulkWriterParam.newBuilder()
    .withCollectionSchema(describeCollectionResp.getCollectionSchema())
    .withConnectParam(storageConnectParam) // S3 or Azure
    .withChunkSize(stageBucket.getChunkSize() * 1024 * 1024)
    .withRemotePath(stageBucket.getPrefix() + "/" + collectionName + "/" + partitionName)
    .withFileType(BulkFileType.PARQUET)
    .build();
remoteBulkWriter = new RemoteBulkWriter(param);

If stageBucket.getAutoImport() is true, the writer automatically calls the Import API after upload and waits for job completion.

Suitable for workloads of 1 M–100 M+ vectors.

Requires external object storage.

Higher latency due to storage round‑trip but lower server pressure.

Mode Selection Summary

Write Method : Insert API vs Parquet → Object Storage → Import API.

Scale : BufferBatchWriter – 10k‑1M vectors; BulkWriter – 1M‑100M+ vectors.

Dependencies : BufferBatchWriter – none; BulkWriter – S3 or Azure.

Configuration Complexity : low vs medium.

Latency : low (direct) vs higher (storage round‑trip).

Server Pressure : high vs low.

Rule of thumb: use BufferBatchWriter for < 1 million vectors; switch to BulkWriter for larger workloads.

Schema Conversion and Type Mapping

MilvusSchemaConverter

maps SeaTunnel types to Milvus types. The full mapping is:

BOOLEAN → Bool

TINYINT → Int8

SMALLINT → Int16

INT → Int32

BIGINT → Int64

FLOAT → Float

DOUBLE → Double

STRING → VarChar (or JSON if column option JSON=true)

MAP → JSON

ARRAY → Array (element type derived from schema)

FLOAT_VECTOR → FloatVector

BINARY_VECTOR → BinaryVector

FLOAT16_VECTOR → Float16Vector

BFLOAT16_VECTOR → BFloat16Vector

SPARSE_FLOAT_VECTOR → SparseFloatVector

TIMESTAMP / TIMESTAMP_TZ → Timestamptz

GEOMETRY → Geometry

ROW → JSON (nested struct)

Special handling notes:

If a STRING column has the option JSON=true, it is stored as JSON instead of VarChar.

ARRAY of ROW elements is mapped to Struct rather than a generic array.

Primary keys are always converted to Int64 (numeric) or VarChar (string).

Geometry conversion is controlled by geometry_convert_mode ("passthrough" vs "parse") and geometry_string_coord_order ("lat_lon" vs "lon_lat").

Practical Configuration and Performance Tuning

env {
  parallelism = 4
  job.mode = "BATCH"
}

source {
  Milvus {
    url = "https://source-milvus:19530"
    token = "root:Milvus"
    database = "default"
    collections = ["articles", "products"]
    batch_size = 1000
  }
}

sink {
  Milvus {
    url = "https://target-milvus:19530"
    token = "root:Milvus"
    database = "default"
    batch_size = 1000
    enable_dynamic_field = true
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    data_save_mode = "APPEND_DATA"
  }
}

If collections is omitted, VTS reads all collections in the database.

parallelism controls the number of splits per collection; a reasonable range is 2 – CPU core count. Setting it to 1 disables offset splitting (single‑threaded).

batch_size influences both source read batch size ( QueryIteratorReq) and sink insert batch size. BufferBatchWriter will halve the batch size automatically on rate‑limit errors.

Tuning workflow: run with defaults, monitor for rate limit exceeded warnings, then adjust batch_size down or up until throughput and stability are balanced.

Error‑Handling System

VTS defines 28 error codes (MILVUS‑01 to MILVUS‑28). Representative codes include: MILVUS-06 – COLLECTION_NOT_LOADED : source checks LoadState before reading; collections must be loaded. MILVUS-17 – WRITE_DATA_FAIL : typically caused by rate limits or oversized messages; BufferBatchWriter auto‑halves batch size. MILVUS-27 – COMPLETED_WITH_ERRORS : bulk import finished with some failed rows; inspect error count against MILVUS-28 threshold.

Source side rate‑limit handling :

Up to three retries, each after a 30 s pause ( rateLimitRetryIntervalMs = 30000).

After three failed attempts, a READ_DATA_FAIL exception is thrown.

Sink side handling (BufferBatchWriter) :

On rate‑limit or oversized‑message error, batchSize is halved.

Thread sleeps 60 seconds, then recursively retries.

Retry continues until data.size() <= 2; if still failing, the exception propagates.

Cross‑Source Migration

VTS supports many connectors beyond Milvus, including:

Vector databases: Pinecone, Qdrant, Weaviate, ChromaDB

Search engine: Elasticsearch

Relational DB: PostgreSQL (pgvector)

Cloud services: Tencent VectorDB, S3 Vector

Others: MongoDB, Iceberg, Kafka, generic file sources

Example flow – Pinecone → Milvus: vectors are read into SeaTunnelRow, transformed by MilvusSchemaConverter, and written via the chosen sink writer.

Benchmark reported by the official VTS repo: migrating 100 million vectors from Pinecone to Milvus on a 4‑core / 8 GB machine achieved ~2,961 vectors/s, completing in ~9.5 hours.

Conclusion

Partition‑aware sharding provides high‑parallelism reads.

Dual write modes cover small‑scale (BufferBatchWriter) and large‑scale (BulkWriter) migrations.

Adaptive error recovery (rate‑limit back‑off, batch‑size reduction) enables unattended runs.

Schema control via field_schema allows custom target schemas and dynamic field extraction.

The codebase (~20 core classes) is concise, making troubleshooting feasible despite a modest community size. For pure Milvus‑to‑Milvus migrations, milvus-backup may be simpler, but VTS excels when cross‑source transformations, custom schema mapping, or bulk import performance are required.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

shardingMilvusSchema MigrationVTSBufferBatchWriterBulkWriterVector Transport Service
Shuge Unlimited
Written by

Shuge Unlimited

Formerly "Ops with Skill", now officially upgraded. Fully dedicated to AI, we share both the why (fundamental insights) and the how (practical implementation). From technical operations to breakthrough thinking, we help you understand AI's transformation and master the core abilities needed to shape the future. ShugeX: boundless exploration, skillful execution.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.