How to Build Distributed Scalar Indexes with Lance and Ray
This guide explains the end‑to‑end workflow for constructing a distributed scalar index in Lance by orchestrating validation, fragment sharding, worker‑level indexing via Ray, and final metadata merging, complete with code snippets and detailed step‑by‑step instructions.
The article outlines a complete pipeline for creating a distributed scalar index using Lance's create_scalar_index API orchestrated by Ray, covering driver‑side validation, fragment distribution, worker tasks, and final metadata commitment.
Phase 1 – Pre‑validation and preparation (Driver)
Check that the installed pylance version is ≥ 0.36.0.
Validate parameters: exactly one of uri or namespace, non‑empty column, num_workers > 0, and index_type must be one of INVERTED, FTS, or BTREE.
Resolve the dataset URI and storage options, optionally via get_or_create_namespace and describe_table.
Open the LanceDataset with the resolved URI and storage options.
Confirm the target column exists and matches the required type for the chosen index type.
Determine the index name (default {column}_idx) and ensure no conflict when replace=False.
Collect the fragment IDs to use (either supplied or all fragments via get_fragments()).
Balance the fragments across workers by calling _distribute_fragments_balanced, which returns a list of fragment batches.
Phase 2 – Build fragment batches (Driver)
The core function that creates balanced batches is:
def _distribute_fragments_balanced(fragments: list[Any], num_workers: int, logger: logging.Logger) -> list[list[int]]:
"""Distribute fragments across workers using a greedy balanced algorithm."""
if not fragments:
return [[] for _ in range(num_workers)]
fragment_info = []
for fragment in fragments:
try:
row_count = fragment.count_rows()
fragment_info.append({"id": fragment.fragment_id, "size": row_count})
except Exception as exc:
logger.warning("Could not get size for fragment %s: %s. Using fragment_id as size estimate.", fragment.fragment_id, exc)
fragment_info.append({"id": fragment.fragment_id, "size": fragment.fragment_id})
fragment_info.sort(key=lambda x: x["size"], reverse=True)
worker_batches = [[] for _ in range(num_workers)]
worker_workloads = [0] * num_workers
for frag_info in fragment_info:
min_idx = min(range(num_workers), key=lambda i: worker_workloads[i])
worker_batches[min_idx].append(frag_info["id"])
worker_workloads[min_idx] += frag_info["size"]
total_size = sum(info["size"] for info in fragment_info)
for i, (batch, workload) in enumerate(zip(worker_batches, worker_workloads)):
pct = (workload / total_size * 100) if total_size > 0 else 0
logger.info(" Worker %d: %d fragments, workload: %d (%.1f%%)", i, len(batch), workload, pct)
return [batch for batch in worker_batches if batch]The algorithm sorts fragments by size (row count) descending and greedily assigns each to the worker with the smallest current workload, ensuring balanced processing.
Phase 3 – Create fragment‑handler closure (Driver)
The function _handle_fragment_index builds a callable that each Ray worker will execute for a specific batch of fragment IDs.
def _handle_fragment_index(
dataset_uri: str,
column: str,
index_type: str,
name: str,
index_uuid: str,
replace: bool,
train: bool,
storage_options: Optional[dict[str, str]] = None,
namespace_impl: Optional[str] = None,
namespace_properties: Optional[dict[str, str]] = None,
table_id: Optional[list[str]] = None,
**kwargs: Any,
):
"""Create a fragment handler closure for scalar index builds."""
def func(fragment_ids: list[int]) -> dict[str, Any]:
try:
if not fragment_ids:
raise ValueError("fragment_ids cannot be empty")
for fragment_id in fragment_ids:
if fragment_id < 0 or fragment_id > 0xFFFFFFFF:
raise ValueError(f"Invalid fragment_id: {fragment_id}")
storage_options_provider = create_storage_options_provider(
namespace_impl, namespace_properties, table_id
)
dataset = LanceDataset(
dataset_uri,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
)
available = {f.fragment_id for f in dataset.get_fragments()}
invalid = set(fragment_ids) - available
if invalid:
raise ValueError(f"Fragment IDs {invalid} do not exist")
logger.info(
"Building distributed scalar index for fragments %s using create_scalar_index",
fragment_ids,
)
dataset.create_scalar_index(
column=column,
index_type=index_type,
name=name,
replace=replace,
train=train,
index_uuid=index_uuid,
fragment_ids=fragment_ids,
**kwargs,
)
field_id = dataset.schema.get_field_index(column)
logger.info(
"Fragment scalar index created successfully for fragments %s",
fragment_ids,
)
return {
"status": "success",
"fragment_ids": fragment_ids,
"fields": [field_id],
"uuid": index_uuid,
}
except Exception as exc:
logger.error(
"Fragment scalar index task failed for fragments %s: %s",
fragment_ids,
exc,
)
return {"status": "error", "fragment_ids": fragment_ids, "error": str(exc)}
return funcThe returned func validates the fragment IDs, creates storage credentials, loads the same dataset, checks fragment existence, invokes dataset.create_scalar_index on that subset, retrieves the field ID, and returns a result dictionary.
Phase 4 – Distributed execution (Ray Pool)
The driver launches a Ray multiprocessing pool and maps each fragment batch to the handler:
results = _map_async_with_pool(
fragment_handler=fragment_handler,
fragment_batches=fragment_batches, # e.g., [[id1, id2], [id3], ...]
num_workers=num_workers,
ray_remote_args=ray_remote_args,
error_prefix="...",
) _map_async_with_poolcreates a ray.util.multiprocessing.Pool with processes=num_workers, calls
pool.map_async(fragment_handler, fragment_batches, chunksize=1), waits for completion with rst_futures.get(), and raises a RuntimeError if any worker reports an error. The result is a list[dict] where each dict is the output of the fragment‑handler closure.
Phase 5 – Failure handling and dataset reload (Driver)
If any result has status == "error", the driver aggregates all error messages and raises RuntimeError("Index building failed: ..."). Otherwise it re‑opens the dataset with LanceDataset(uri, ...) to obtain the latest state after index creation.
Phase 6 – Merge index metadata (Driver)
The driver merges per‑fragment index metadata into a logical index compatible with different pylance versions:
merge_index_metadata_compat(dataset, index_id, index_type=index_type, **kwargs)It then constructs an Index object with the new UUID, name, fields, dataset version, and the set of fragment IDs, wraps it in a LanceOperation.CreateIndex, and finally commits the operation to the dataset manifest via LanceDataset.commit(uri, create_index_op, ...). The returned dataset now contains the newly built scalar index.
Overall, the process distributes scalar‑index construction across Ray workers, balances load by fragment row count, validates each step, and consolidates metadata to produce a consistent, searchable index on the Lance dataset.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology Tribe
Focused on computer science and cutting‑edge tech, we distill complex knowledge into clear, actionable insights. We track tech evolution, share industry trends and deep analysis, helping you keep learning, boost your technical edge, and ride the digital wave forward.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
