How to Speed Up Bulk Vector Searches with CLI and SDK Concurrency
This guide explains how to dramatically reduce latency for batch semantic search, RAG multi‑path retrieval, and multimodal vector queries by running multiple OSS Vectors embed requests in parallel using CLI‑based, xargs, shell background jobs, Python asyncio, and SDK‑level concurrency techniques.
Overview
When many vector queries need to be executed concurrently—e.g., batch Q&A, multi‑path RAG, or multimodal search—running each request serially makes total latency grow linearly with the number of queries ( N × single‑request latency). Parallel execution can reduce overall latency to roughly the latency of a single request, dramatically increasing throughput.
Choosing a concurrency method
CLI concurrency : launch multiple oss-vectors-embed query processes from the command line. Suitable when you have a list of query texts and want a quick solution without writing code.
SDK concurrency : call the OSS Vectors API directly from application code. Suitable for services that need fine‑grained control over parameters, filters, or integration into business logic.
CLI concurrency
Prerequisites:
OSS Vectors Embed CLI installed.
Environment variables OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET and DASHSCOPE_API_KEY set.
A vector bucket and index created, with index dimension matching the embedding model output.
Simple xargs parallelism
For a quick one‑liner, use xargs -P to run up to five processes in parallel:
cat queries.txt | xargs -P 5 -I {} \
oss-vectors-embed \
--account-id "YOUR_ACCOUNT_ID" \
--vectors-region cn-hangzhou \
query \
--vector-bucket-name "YOUR_VECTOR_BUCKET" \
--index-name "YOUR_INDEX" \
--model-id text-embedding-v4 \
--text-value "{}" \
--top-k 10The flag -P 5 limits the number of concurrent processes. Redirect output to a file if persistence is required.
Shell background jobs
For a small batch (≤10 queries), launch each command in the background and wait for all jobs to finish:
#!/bin/bash
ACCOUNT_ID="YOUR_ACCOUNT_ID"
REGION="cn-hangzhou"
BUCKET="YOUR_VECTOR_BUCKET"
INDEX="YOUR_INDEX"
queries=(
"How to configure lifecycle rules"
"What storage classes does OSS provide"
"How to set cross‑region replication"
)
mkdir -p ./query-results
for i in "${!queries[@]}"; do
oss-vectors-embed \
--account-id "$ACCOUNT_ID" \
--vectors-region "$REGION" \
query \
--vector-bucket-name "$BUCKET" \
--index-name "$INDEX" \
--model-id text-embedding-v4 \
--text-value "${queries[$i]}" \
--top-k 10 \
> "./query-results/result_$i.json" 2>&1 &
done
wait
echo "All queries finished, results are in ./query-results/"Inspect a result with cat ./query-results/result_0.json | python3 -m json.tool.
Controlled parallelism for large batches
When processing dozens or hundreds of queries, limit the number of simultaneous processes to stay within API quotas:
#!/bin/bash
ACCOUNT_ID="YOUR_ACCOUNT_ID"
REGION="cn-hangzhou"
BUCKET="YOUR_VECTOR_BUCKET"
INDEX="YOUR_INDEX"
MODEL="text-embedding-v4"
MAX_CONCURRENT=5
QUERY_FILE="./queries.txt" # one query per line
mkdir -p ./query-results
run_query() {
local idx=$1
local text=$2
oss-vectors-embed \
--account-id "$ACCOUNT_ID" \
--vectors-region "$REGION" \
query \
--vector-bucket-name "$BUCKET" \
--index-name "$INDEX" \
--model-id "$MODEL" \
--text-value "$text" \
--top-k 10 \
> "./query-results/result_${idx}.json" 2>&1
}
idx=0
while IFS= read -r query_text; do
run_query "$idx" "$query_text" &
idx=$((idx + 1))
if (( $(jobs -rp | wc -l) >= MAX_CONCURRENT )); then
wait -n # wait for any job to finish before launching a new one
fi
done < "$QUERY_FILE"
wait
echo "All $idx queries completed"Python wrappers
Asyncio wrapper for the CLI
Drive the CLI from Python using asyncio.create_subprocess_exec to run up to five queries concurrently and aggregate the JSON results:
import asyncio, json
from pathlib import Path
ACCOUNT_ID = "YOUR_ACCOUNT_ID"
REGION = "cn-hangzhou"
BUCKET = "YOUR_VECTOR_BUCKET"
INDEX = "YOUR_INDEX"
MODEL = "text-embedding-v4"
MAX_CONCURRENT = 5
async def run_query(semaphore, query_text, query_id):
async with semaphore:
cmd = [
"oss-vectors-embed",
"--account-id", ACCOUNT_ID,
"--vectors-region", REGION,
"query",
"--vector-bucket-name", BUCKET,
"--index-name", INDEX,
"--model-id", MODEL,
"--text-value", query_text,
"--top-k", "10",
"--return-metadata",
]
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
result = json.loads(stdout.decode())
print(f"Query {query_id} completed, returned {len(result.get('results', []))} results")
return {"query_id": query_id, "query_text": query_text, "result": result}
else:
print(f"Query {query_id} failed: {stderr.decode()}")
return {"query_id": query_id, "query_text": query_text, "error": stderr.decode()}
async def batch_query(queries):
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
tasks = [run_query(semaphore, q, i) for i, q in enumerate(queries)]
results = await asyncio.gather(*tasks)
Path("./query-results/batch_results.json").write_text(json.dumps(results, ensure_ascii=False, indent=2))
print("Batch results saved to ./query-results/batch_results.json")
return results
if __name__ == "__main__":
queries = [
"How to configure lifecycle rules",
"What storage classes does OSS provide",
"How to set cross‑region replication",
"Bucket tag limits",
"How to enable versioning",
]
asyncio.run(batch_query(queries))Python SDK concurrency
Use the OSS Vectors SDK directly when you already have embedding vectors. The example creates a thread pool limited to five workers:
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = "YOUR_ACCOUNT_ID"
REGION = "cn-hangzhou"
BUCKET = "YOUR_VECTOR_BUCKET"
INDEX = "YOUR_INDEX"
MAX_CONCURRENT = 5
def create_vector_client():
cred = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = cred
cfg.region = REGION
cfg.account_id = ACCOUNT_ID
return oss_vectors.Client(cfg)
def run_query(client, query_vector, query_id, query_filter=None):
request = oss_vectors.models.QueryVectorsRequest(
bucket=BUCKET,
index_name=INDEX,
query_vector=query_vector,
filter=query_filter,
return_distance=True,
return_metadata=True,
top_k=10,
)
result = client.query_vectors(request)
print(f"Query {query_id} completed, status code: {result.status_code}")
return {
"query_id": query_id,
"status_code": result.status_code,
"vectors": [str(v) for v in result.vectors] if result.vectors else [],
}
def batch_query(query_vectors):
client = create_vector_client()
results = []
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
futures = {executor.submit(run_query, client, qv, i): i for i, qv in enumerate(query_vectors)}
for future in as_completed(futures):
idx = futures[future]
try:
results.append(future.result())
except Exception as e:
print(f"Query {idx} failed: {e}")
results.append({"query_id": idx, "error": str(e)})
results.sort(key=lambda x: x["query_id"])
out = Path("./query-results/sdk_batch_results.json")
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(results, ensure_ascii=False, indent=2))
print(f"Batch results saved to {out}")
return results
if __name__ == "__main__":
# Example: five 128‑dim vectors
query_vectors = [
{"float32": [0.1] * 128},
{"float32": [0.2] * 128},
{"float32": [0.3] * 128},
{"float32": [0.4] * 128},
{"float32": [0.5] * 128},
]
batch_query(query_vectors)The script prints each query’s status code and writes a consolidated JSON file.
SDK concurrency with per‑query filters
When different queries require distinct filter conditions, include a filter object in each request:
tasks = [
{"vector": {"float32": [0.1] * 128}, "filter": {"$and": [{"type": {"$in": ["tutorial"]}}]}},
{"vector": {"float32": [0.2] * 128}, "filter": {"$and": [{"type": {"$nin": ["comedy", "documentary"]}}]}},
{"vector": {"float32": [0.3] * 128}, "filter": None},
]
client = create_vector_client()
results = []
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
futures = {executor.submit(run_query, client, t["vector"], i, t["filter"]): i for i, t in enumerate(tasks)}
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
idx = futures[future]
print(f"Query {idx} failed: {e}")
results.append({"query_id": idx, "error": str(e)})Go SDK concurrency
Install the Go SDK:
go get github.com/aliyun/alibabacloud-oss-go-sdk-v2Basic concurrent retrieval
package main
import (
"context"
"fmt"
"log"
"sync"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)
const (
region = "cn-hangzhou"
bucketName = "YOUR_VECTOR_BUCKET"
accountId = "YOUR_ACCOUNT_ID"
indexName = "YOUR_INDEX"
maxConcurrent = 5
)
func main() {
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region).
WithAccountId(accountId)
client := vectors.NewVectorsClient(cfg)
// Example: five 128‑dim vectors
queryVectors := []map[string]any{{"float32": []float32{0.1}}, {"float32": []float32{0.2}}, {"float32": []float32{0.3}}, {"float32": []float32{0.4}}, {"float32": []float32{0.5}}}
var wg sync.WaitGroup
sem := make(chan struct{}, maxConcurrent)
for i, qv := range queryVectors {
wg.Add(1)
sem <- struct{}{}
go func(idx int, vec map[string]any) {
defer wg.Done()
defer func() { <-sem }()
req := &vectors.QueryVectorsRequest{
Bucket: oss.Ptr(bucketName),
IndexName: oss.Ptr(indexName),
QueryVector: vec,
ReturnMetadata: oss.Ptr(true),
ReturnDistance: oss.Ptr(true),
TopK: oss.Ptr(10),
}
resp, err := client.QueryVectors(context.TODO(), req)
if err != nil {
log.Printf("Query %d failed: %v", idx, err)
return
}
fmt.Printf("Query %d completed, status code: %d
", idx, resp.StatusCode)
}(i, qv)
}
wg.Wait()
fmt.Println("All queries completed")
}Concurrent retrieval with filters
type queryTask struct {
vector map[string]any
filter map[string]any
}
// ... inside main after client creation
tasks := []queryTask{{
vector: map[string]any{"float32": []float32{0.1}},
filter: map[string]any{"$and": []map[string]any{{"type": map[string]any{"$in": []string{"tutorial"}}}}},
}, {
vector: map[string]any{"float32": []float32{0.2}},
filter: map[string]any{"$and": []map[string]any{{"type": map[string]any{"$nin": []string{"comedy", "documentary"}}}}},
}, {
vector: map[string]any{"float32": []float32{0.3}},
filter: nil,
}}
var wg sync.WaitGroup
sem := make(chan struct{}, maxConcurrent)
for i, task := range tasks {
wg.Add(1)
sem <- struct{}{}
go func(idx int, t queryTask) {
defer wg.Done()
defer func() { <-sem }()
req := &vectors.QueryVectorsRequest{
Bucket: oss.Ptr(bucketName),
IndexName: oss.Ptr(indexName),
QueryVector: t.vector,
ReturnMetadata: oss.Ptr(true),
ReturnDistance: oss.Ptr(true),
TopK: oss.Ptr(10),
}
if t.filter != nil {
req.Filter = t.filter
}
resp, err := client.QueryVectors(context.TODO(), req)
if err != nil {
log.Printf("Query %d failed: %v", idx, err)
return
}
fmt.Printf("Query %d completed, status code: %d
", idx, resp.StatusCode)
}(i, task)
}
wg.Wait()
fmt.Println("All queries completed")Performance tuning
Key factors influencing concurrent retrieval speed:
Network latency to the OSS Vectors endpoint.
API rate limits – keep the number of simultaneous requests within your quota.
Optimal concurrency level – on typical cloud VPCs, 5‑10 parallel jobs often provide the best trade‑off.
CLI adds an extra embedding step; SDK works directly with pre‑computed vectors, which can reduce latency.
Adjust MAX_CONCURRENT and monitor average response time to find the sweet spot for your workload.
References
OSS Vectors Embed CLI documentation: https://help.aliyun.com/zh/oss/user-guide/oss-vectors-embed-cli
Best practices for building a semantic retrieval platform on OSS: https://help.aliyun.com/zh/oss/user-guide/best-practices-for-building-a-semantic-retrieval-platform
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
