How to Speed Up Bulk Vector Searches with CLI and SDK Concurrency

This guide explains how to dramatically reduce latency for batch semantic search, RAG multi‑path retrieval, and multimodal vector queries by running multiple OSS Vectors embed requests in parallel using CLI‑based, xargs, shell background jobs, Python asyncio, and SDK‑level concurrency techniques.

Alibaba Cloud Infrastructure
Alibaba Cloud Infrastructure
Alibaba Cloud Infrastructure
How to Speed Up Bulk Vector Searches with CLI and SDK Concurrency

Overview

When many vector queries need to be executed concurrently—e.g., batch Q&A, multi‑path RAG, or multimodal search—running each request serially makes total latency grow linearly with the number of queries ( N × single‑request latency). Parallel execution can reduce overall latency to roughly the latency of a single request, dramatically increasing throughput.

Choosing a concurrency method

CLI concurrency : launch multiple oss-vectors-embed query processes from the command line. Suitable when you have a list of query texts and want a quick solution without writing code.

SDK concurrency : call the OSS Vectors API directly from application code. Suitable for services that need fine‑grained control over parameters, filters, or integration into business logic.

CLI concurrency

Prerequisites:

OSS Vectors Embed CLI installed.

Environment variables OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET and DASHSCOPE_API_KEY set.

A vector bucket and index created, with index dimension matching the embedding model output.

Simple xargs parallelism

For a quick one‑liner, use xargs -P to run up to five processes in parallel:

cat queries.txt | xargs -P 5 -I {} \
  oss-vectors-embed \
    --account-id "YOUR_ACCOUNT_ID" \
    --vectors-region cn-hangzhou \
    query \
    --vector-bucket-name "YOUR_VECTOR_BUCKET" \
    --index-name "YOUR_INDEX" \
    --model-id text-embedding-v4 \
    --text-value "{}" \
    --top-k 10

The flag -P 5 limits the number of concurrent processes. Redirect output to a file if persistence is required.

Shell background jobs

For a small batch (≤10 queries), launch each command in the background and wait for all jobs to finish:

#!/bin/bash
ACCOUNT_ID="YOUR_ACCOUNT_ID"
REGION="cn-hangzhou"
BUCKET="YOUR_VECTOR_BUCKET"
INDEX="YOUR_INDEX"
queries=(
  "How to configure lifecycle rules"
  "What storage classes does OSS provide"
  "How to set cross‑region replication"
)
mkdir -p ./query-results
for i in "${!queries[@]}"; do
  oss-vectors-embed \
    --account-id "$ACCOUNT_ID" \
    --vectors-region "$REGION" \
    query \
    --vector-bucket-name "$BUCKET" \
    --index-name "$INDEX" \
    --model-id text-embedding-v4 \
    --text-value "${queries[$i]}" \
    --top-k 10 \
    > "./query-results/result_$i.json" 2>&1 &
done
wait
echo "All queries finished, results are in ./query-results/"

Inspect a result with cat ./query-results/result_0.json | python3 -m json.tool.

Controlled parallelism for large batches

When processing dozens or hundreds of queries, limit the number of simultaneous processes to stay within API quotas:

#!/bin/bash
ACCOUNT_ID="YOUR_ACCOUNT_ID"
REGION="cn-hangzhou"
BUCKET="YOUR_VECTOR_BUCKET"
INDEX="YOUR_INDEX"
MODEL="text-embedding-v4"
MAX_CONCURRENT=5
QUERY_FILE="./queries.txt"  # one query per line
mkdir -p ./query-results
run_query() {
  local idx=$1
  local text=$2
  oss-vectors-embed \
    --account-id "$ACCOUNT_ID" \
    --vectors-region "$REGION" \
    query \
    --vector-bucket-name "$BUCKET" \
    --index-name "$INDEX" \
    --model-id "$MODEL" \
    --text-value "$text" \
    --top-k 10 \
    > "./query-results/result_${idx}.json" 2>&1
}
idx=0
while IFS= read -r query_text; do
  run_query "$idx" "$query_text" &
  idx=$((idx + 1))
  if (( $(jobs -rp | wc -l) >= MAX_CONCURRENT )); then
    wait -n  # wait for any job to finish before launching a new one
  fi
done < "$QUERY_FILE"
wait
echo "All $idx queries completed"

Python wrappers

Asyncio wrapper for the CLI

Drive the CLI from Python using asyncio.create_subprocess_exec to run up to five queries concurrently and aggregate the JSON results:

import asyncio, json
from pathlib import Path
ACCOUNT_ID = "YOUR_ACCOUNT_ID"
REGION = "cn-hangzhou"
BUCKET = "YOUR_VECTOR_BUCKET"
INDEX = "YOUR_INDEX"
MODEL = "text-embedding-v4"
MAX_CONCURRENT = 5

async def run_query(semaphore, query_text, query_id):
    async with semaphore:
        cmd = [
            "oss-vectors-embed",
            "--account-id", ACCOUNT_ID,
            "--vectors-region", REGION,
            "query",
            "--vector-bucket-name", BUCKET,
            "--index-name", INDEX,
            "--model-id", MODEL,
            "--text-value", query_text,
            "--top-k", "10",
            "--return-metadata",
        ]
        proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
        stdout, stderr = await proc.communicate()
        if proc.returncode == 0:
            result = json.loads(stdout.decode())
            print(f"Query {query_id} completed, returned {len(result.get('results', []))} results")
            return {"query_id": query_id, "query_text": query_text, "result": result}
        else:
            print(f"Query {query_id} failed: {stderr.decode()}")
            return {"query_id": query_id, "query_text": query_text, "error": stderr.decode()}

async def batch_query(queries):
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    tasks = [run_query(semaphore, q, i) for i, q in enumerate(queries)]
    results = await asyncio.gather(*tasks)
    Path("./query-results/batch_results.json").write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print("Batch results saved to ./query-results/batch_results.json")
    return results

if __name__ == "__main__":
    queries = [
        "How to configure lifecycle rules",
        "What storage classes does OSS provide",
        "How to set cross‑region replication",
        "Bucket tag limits",
        "How to enable versioning",
    ]
    asyncio.run(batch_query(queries))

Python SDK concurrency

Use the OSS Vectors SDK directly when you already have embedding vectors. The example creates a thread pool limited to five workers:

import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors

ACCOUNT_ID = "YOUR_ACCOUNT_ID"
REGION = "cn-hangzhou"
BUCKET = "YOUR_VECTOR_BUCKET"
INDEX = "YOUR_INDEX"
MAX_CONCURRENT = 5

def create_vector_client():
    cred = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = cred
    cfg.region = REGION
    cfg.account_id = ACCOUNT_ID
    return oss_vectors.Client(cfg)

def run_query(client, query_vector, query_id, query_filter=None):
    request = oss_vectors.models.QueryVectorsRequest(
        bucket=BUCKET,
        index_name=INDEX,
        query_vector=query_vector,
        filter=query_filter,
        return_distance=True,
        return_metadata=True,
        top_k=10,
    )
    result = client.query_vectors(request)
    print(f"Query {query_id} completed, status code: {result.status_code}")
    return {
        "query_id": query_id,
        "status_code": result.status_code,
        "vectors": [str(v) for v in result.vectors] if result.vectors else [],
    }

def batch_query(query_vectors):
    client = create_vector_client()
    results = []
    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {executor.submit(run_query, client, qv, i): i for i, qv in enumerate(query_vectors)}
        for future in as_completed(futures):
            idx = futures[future]
            try:
                results.append(future.result())
            except Exception as e:
                print(f"Query {idx} failed: {e}")
                results.append({"query_id": idx, "error": str(e)})
    results.sort(key=lambda x: x["query_id"])
    out = Path("./query-results/sdk_batch_results.json")
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"Batch results saved to {out}")
    return results

if __name__ == "__main__":
    # Example: five 128‑dim vectors
    query_vectors = [
        {"float32": [0.1] * 128},
        {"float32": [0.2] * 128},
        {"float32": [0.3] * 128},
        {"float32": [0.4] * 128},
        {"float32": [0.5] * 128},
    ]
    batch_query(query_vectors)

The script prints each query’s status code and writes a consolidated JSON file.

SDK concurrency with per‑query filters

When different queries require distinct filter conditions, include a filter object in each request:

tasks = [
    {"vector": {"float32": [0.1] * 128}, "filter": {"$and": [{"type": {"$in": ["tutorial"]}}]}},
    {"vector": {"float32": [0.2] * 128}, "filter": {"$and": [{"type": {"$nin": ["comedy", "documentary"]}}]}},
    {"vector": {"float32": [0.3] * 128}, "filter": None},
]
client = create_vector_client()
results = []
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
    futures = {executor.submit(run_query, client, t["vector"], i, t["filter"]): i for i, t in enumerate(tasks)}
    for future in as_completed(futures):
        try:
            results.append(future.result())
        except Exception as e:
            idx = futures[future]
            print(f"Query {idx} failed: {e}")
            results.append({"query_id": idx, "error": str(e)})

Go SDK concurrency

Install the Go SDK:

go get github.com/aliyun/alibabacloud-oss-go-sdk-v2

Basic concurrent retrieval

package main
import (
    "context"
    "fmt"
    "log"
    "sync"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)

const (
    region        = "cn-hangzhou"
    bucketName    = "YOUR_VECTOR_BUCKET"
    accountId     = "YOUR_ACCOUNT_ID"
    indexName     = "YOUR_INDEX"
    maxConcurrent = 5
)

func main() {
    cfg := oss.LoadDefaultConfig().
        WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
        WithRegion(region).
        WithAccountId(accountId)
    client := vectors.NewVectorsClient(cfg)

    // Example: five 128‑dim vectors
    queryVectors := []map[string]any{{"float32": []float32{0.1}}, {"float32": []float32{0.2}}, {"float32": []float32{0.3}}, {"float32": []float32{0.4}}, {"float32": []float32{0.5}}}

    var wg sync.WaitGroup
    sem := make(chan struct{}, maxConcurrent)
    for i, qv := range queryVectors {
        wg.Add(1)
        sem <- struct{}{}
        go func(idx int, vec map[string]any) {
            defer wg.Done()
            defer func() { <-sem }()
            req := &vectors.QueryVectorsRequest{
                Bucket:         oss.Ptr(bucketName),
                IndexName:      oss.Ptr(indexName),
                QueryVector:    vec,
                ReturnMetadata: oss.Ptr(true),
                ReturnDistance: oss.Ptr(true),
                TopK:           oss.Ptr(10),
            }
            resp, err := client.QueryVectors(context.TODO(), req)
            if err != nil {
                log.Printf("Query %d failed: %v", idx, err)
                return
            }
            fmt.Printf("Query %d completed, status code: %d
", idx, resp.StatusCode)
        }(i, qv)
    }
    wg.Wait()
    fmt.Println("All queries completed")
}

Concurrent retrieval with filters

type queryTask struct {
    vector map[string]any
    filter map[string]any
}

// ... inside main after client creation
tasks := []queryTask{{
    vector: map[string]any{"float32": []float32{0.1}},
    filter: map[string]any{"$and": []map[string]any{{"type": map[string]any{"$in": []string{"tutorial"}}}}},
}, {
    vector: map[string]any{"float32": []float32{0.2}},
    filter: map[string]any{"$and": []map[string]any{{"type": map[string]any{"$nin": []string{"comedy", "documentary"}}}}},
}, {
    vector: map[string]any{"float32": []float32{0.3}},
    filter: nil,
}}

var wg sync.WaitGroup
sem := make(chan struct{}, maxConcurrent)
for i, task := range tasks {
    wg.Add(1)
    sem <- struct{}{}
    go func(idx int, t queryTask) {
        defer wg.Done()
        defer func() { <-sem }()
        req := &vectors.QueryVectorsRequest{
            Bucket:         oss.Ptr(bucketName),
            IndexName:      oss.Ptr(indexName),
            QueryVector:    t.vector,
            ReturnMetadata: oss.Ptr(true),
            ReturnDistance: oss.Ptr(true),
            TopK:           oss.Ptr(10),
        }
        if t.filter != nil {
            req.Filter = t.filter
        }
        resp, err := client.QueryVectors(context.TODO(), req)
        if err != nil {
            log.Printf("Query %d failed: %v", idx, err)
            return
        }
        fmt.Printf("Query %d completed, status code: %d
", idx, resp.StatusCode)
    }(i, task)
}
wg.Wait()
fmt.Println("All queries completed")

Performance tuning

Key factors influencing concurrent retrieval speed:

Network latency to the OSS Vectors endpoint.

API rate limits – keep the number of simultaneous requests within your quota.

Optimal concurrency level – on typical cloud VPCs, 5‑10 parallel jobs often provide the best trade‑off.

CLI adds an extra embedding step; SDK works directly with pre‑computed vectors, which can reduce latency.

Adjust MAX_CONCURRENT and monitor average response time to find the sweet spot for your workload.

References

OSS Vectors Embed CLI documentation: https://help.aliyun.com/zh/oss/user-guide/oss-vectors-embed-cli

Best practices for building a semantic retrieval platform on OSS: https://help.aliyun.com/zh/oss/user-guide/best-practices-for-building-a-semantic-retrieval-platform

SDKCLIPythonGovector searchOSSbatch retrieval
Alibaba Cloud Infrastructure
Written by

Alibaba Cloud Infrastructure

For uninterrupted computing services

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.