Milvus VTS Deep Dive: Two Write Modes, Sharding, and Migration Best Practices
The article provides a source‑code level analysis of Milvus Vector Transport Service (VTS), detailing its three‑stage architecture, partition‑aware sharding logic, two distinct sink write mechanisms (BufferBatchWriter and BulkWriter), schema conversion rules, error‑handling strategies, performance tuning parameters, and practical configuration examples for efficient vector data migration across various data sources.
Overall Architecture: Source → Transform → Sink
VTS builds on Apache SeaTunnel’s three‑stage design. The Milvus connector consists of the following class chain:
Source side: MilvusSource → MilvusSourceSplitEnumerator → MilvusSourceReader → MilvusBufferReader Sink side: MilvusSink → MilvusSinkWriter → either MilvusBufferBatchWriter (direct Insert API) or MilvusBulkWriter (Import API)
Source Side – Partition‑Aware Sharding Strategy
The core component is MilvusSourceSplitEnumerator. Its run() method performs three steps:
Enumerate all collections to be migrated.
For each collection call generateSplits() to create splits.
Assign the generated splits to registered readers. generateSplits() first checks whether the collection defines a Partition Key :
boolean hasPartitionKey =
describeCollectionResp.getCollectionSchema().getFieldSchemaList().stream()
.anyMatch(CreateCollectionReq.FieldSchema::getIsPartitionKey);Two split paths are selected based on this flag:
Path A – With Partition Key : the whole collection is treated as a single split and further divided by offset using splitByOffset().
Path B – No Partition Key but multiple partitions : each partition becomes a split; each partition is then divided by offset, yielding a two‑level parallelism. splitByOffset() computes the total row count, divides it by the parallelism parameter, and creates equal‑size splits. If parallelism < 2 a single split is returned (single‑threaded mode).
long numOfEntities = /* result of count(*) */;
long splitSize = Math.max(1, numOfEntities / parallelism);
for (int i = 0; i < parallelism; i++) {
long offset = i * splitSize;
if (i == parallelism - 1) {
newSplit = ...offset(offset).build(); // last split has no limit
} else {
newSplit = ...offset(offset).limit(splitSize).build();
}
}Recommended parallelism is between 2 and the number of CPU cores. Larger values increase concurrent readers and may trigger Milvus rate limits.
Sink Side – Dual Write Modes
BufferBatchWriter – Direct Insert API
Data is cached in a List<JsonObject> ( milvusDataCache) until the configured batch_size is reached, then the Milvus Insert API is invoked.
public void write(SeaTunnelRow element) {
JsonObject data = milvusSinkConverter.buildMilvusData(...);
milvusDataCache.add(data);
writeCache.incrementAndGet();
writeCount.incrementAndGet();
if (writeCache >= batchSize) {
commit(true);
}
}Commit condition: writeCache >= batchSize. When a rate‑limit or oversized‑message error occurs, the writer automatically halves batchSize, sleeps 60 seconds, and recursively retries until the batch size is ≤ 2.
if (e.getMessage().contains("rate limit exceeded") ||
e.getMessage().contains("received message larger than max")) {
if (data.size() > 2) {
this.batchSize = this.batchSize / 2;
Thread.sleep(60000);
insertWrite(partitionName, firstHalf);
insertWrite(partitionName, secondHalf);
}
}BulkWriter – Parquet + Import API
Designed for massive migrations. Data is first written to Parquet files, uploaded to object storage (S3‑compatible or Azure Blob), then the Milvus Import API loads the data.
RemoteBulkWriterParam param = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(describeCollectionResp.getCollectionSchema())
.withConnectParam(storageConnectParam) // S3 or Azure
.withChunkSize(stageBucket.getChunkSize() * 1024 * 1024)
.withRemotePath(stageBucket.getPrefix() + "/" + collectionName + "/" + partitionName)
.withFileType(BulkFileType.PARQUET)
.build();
remoteBulkWriter = new RemoteBulkWriter(param);If stageBucket.getAutoImport() is true, the writer automatically calls the Import API after upload and waits for job completion.
Suitable for workloads of 1 M–100 M+ vectors.
Requires external object storage.
Higher latency due to storage round‑trip but lower server pressure.
Mode Selection Summary
Write Method : Insert API vs Parquet → Object Storage → Import API.
Scale : BufferBatchWriter – 10k‑1M vectors; BulkWriter – 1M‑100M+ vectors.
Dependencies : BufferBatchWriter – none; BulkWriter – S3 or Azure.
Configuration Complexity : low vs medium.
Latency : low (direct) vs higher (storage round‑trip).
Server Pressure : high vs low.
Rule of thumb: use BufferBatchWriter for < 1 million vectors; switch to BulkWriter for larger workloads.
Schema Conversion and Type Mapping
MilvusSchemaConvertermaps SeaTunnel types to Milvus types. The full mapping is:
BOOLEAN → Bool
TINYINT → Int8
SMALLINT → Int16
INT → Int32
BIGINT → Int64
FLOAT → Float
DOUBLE → Double
STRING → VarChar (or JSON if column option JSON=true)
MAP → JSON
ARRAY → Array (element type derived from schema)
FLOAT_VECTOR → FloatVector
BINARY_VECTOR → BinaryVector
FLOAT16_VECTOR → Float16Vector
BFLOAT16_VECTOR → BFloat16Vector
SPARSE_FLOAT_VECTOR → SparseFloatVector
TIMESTAMP / TIMESTAMP_TZ → Timestamptz
GEOMETRY → Geometry
ROW → JSON (nested struct)
Special handling notes:
If a STRING column has the option JSON=true, it is stored as JSON instead of VarChar.
ARRAY of ROW elements is mapped to Struct rather than a generic array.
Primary keys are always converted to Int64 (numeric) or VarChar (string).
Geometry conversion is controlled by geometry_convert_mode ("passthrough" vs "parse") and geometry_string_coord_order ("lat_lon" vs "lon_lat").
Practical Configuration and Performance Tuning
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Milvus {
url = "https://source-milvus:19530"
token = "root:Milvus"
database = "default"
collections = ["articles", "products"]
batch_size = 1000
}
}
sink {
Milvus {
url = "https://target-milvus:19530"
token = "root:Milvus"
database = "default"
batch_size = 1000
enable_dynamic_field = true
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}If collections is omitted, VTS reads all collections in the database.
parallelism controls the number of splits per collection; a reasonable range is 2 – CPU core count. Setting it to 1 disables offset splitting (single‑threaded).
batch_size influences both source read batch size ( QueryIteratorReq) and sink insert batch size. BufferBatchWriter will halve the batch size automatically on rate‑limit errors.
Tuning workflow: run with defaults, monitor for rate limit exceeded warnings, then adjust batch_size down or up until throughput and stability are balanced.
Error‑Handling System
VTS defines 28 error codes (MILVUS‑01 to MILVUS‑28). Representative codes include: MILVUS-06 – COLLECTION_NOT_LOADED : source checks LoadState before reading; collections must be loaded. MILVUS-17 – WRITE_DATA_FAIL : typically caused by rate limits or oversized messages; BufferBatchWriter auto‑halves batch size. MILVUS-27 – COMPLETED_WITH_ERRORS : bulk import finished with some failed rows; inspect error count against MILVUS-28 threshold.
Source side rate‑limit handling :
Up to three retries, each after a 30 s pause ( rateLimitRetryIntervalMs = 30000).
After three failed attempts, a READ_DATA_FAIL exception is thrown.
Sink side handling (BufferBatchWriter) :
On rate‑limit or oversized‑message error, batchSize is halved.
Thread sleeps 60 seconds, then recursively retries.
Retry continues until data.size() <= 2; if still failing, the exception propagates.
Cross‑Source Migration
VTS supports many connectors beyond Milvus, including:
Vector databases: Pinecone, Qdrant, Weaviate, ChromaDB
Search engine: Elasticsearch
Relational DB: PostgreSQL (pgvector)
Cloud services: Tencent VectorDB, S3 Vector
Others: MongoDB, Iceberg, Kafka, generic file sources
Example flow – Pinecone → Milvus: vectors are read into SeaTunnelRow, transformed by MilvusSchemaConverter, and written via the chosen sink writer.
Benchmark reported by the official VTS repo: migrating 100 million vectors from Pinecone to Milvus on a 4‑core / 8 GB machine achieved ~2,961 vectors/s, completing in ~9.5 hours.
Conclusion
Partition‑aware sharding provides high‑parallelism reads.
Dual write modes cover small‑scale (BufferBatchWriter) and large‑scale (BulkWriter) migrations.
Adaptive error recovery (rate‑limit back‑off, batch‑size reduction) enables unattended runs.
Schema control via field_schema allows custom target schemas and dynamic field extraction.
The codebase (~20 core classes) is concise, making troubleshooting feasible despite a modest community size. For pure Milvus‑to‑Milvus migrations, milvus-backup may be simpler, but VTS excels when cross‑source transformations, custom schema mapping, or bulk import performance are required.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Shuge Unlimited
Formerly "Ops with Skill", now officially upgraded. Fully dedicated to AI, we share both the why (fundamental insights) and the how (practical implementation). From technical operations to breakthrough thinking, we help you understand AI's transformation and master the core abilities needed to shape the future. ShugeX: boundless exploration, skillful execution.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
