How to Scale Global Dictionary Indexing with Distributed SQL in Minutes
This article explains a distributed‑computing approach for generating a globally unique integer index from massive string datasets, replacing single‑reducer sorting with hash‑bucket partitioning and parallel processing to cut runtime from 30 minutes to just 2 minutes.
In many business scenarios, strings must be mapped to globally unique integers, such as in BitMap dictionary indexing. The common approach sorts the entire dataset globally and assigns row numbers, which becomes a performance bottleneck when the data reaches billions of records.
Traditional ORDER BY Method
For a dataset test_data with 9.3 hundred million rows, the global window sorting can be written as:
INSERT OVERWRITE TABLE test_data_result
SELECT id,
ROW_NUMBER() OVER (ORDER BY id ASC) AS rn
FROM test_data;This method forces all data to a single reducer, leading to a 30‑minute execution time. Adjusting odps.sql.reducer.instances=256 does not help because only one reducer can guarantee global uniqueness.
Distributed Optimization Idea
To avoid single‑machine computation, the data is first hashed into buckets for local sorting, then a global index is derived by adding a bucket base address to each row's relative position within its bucket. This is similar to base‑address addressing in operating systems.
Step 1 – Hash Bucketing
WITH hash_bucket AS (
SELECT id,
ROW_NUMBER() OVER (PARTITION BY bucket_no ORDER BY id ASC) AS bucket_rel_index,
COUNT(1) OVER (PARTITION BY bucket_no) AS bucket_size,
bucket_no
FROM (
SELECT id,
ABS(HASH(id)) % 100000 AS bucket_no
FROM test_data
) t
)Step 2 – Compute Bucket Base
, bucket_base AS (
SELECT bucket_no,
SUM(bucket_size) OVER (ORDER BY bucket_no ASC) - bucket_size AS bucket_base
FROM (
SELECT DISTINCT bucket_no, bucket_size
FROM hash_bucket
) t
)Step 3 – Generate Global Index
INSERT OVERWRITE TABLE sort_data_result_1
SELECT t1.id,
t2.bucket_base + bucket_rel_index AS id_index
FROM hash_bucket t1
JOIN bucket_base t2 ON t1.bucket_no = t2.bucket_no;Explanation:
First, each id is hashed into one of 100,000 buckets; the bucket size and each id 's relative position within its bucket are computed.
Second, the base address for each bucket is calculated by cumulatively summing the sizes of preceding buckets.
Third, the global unique index id_index is obtained by adding the bucket base to the relative index.
After applying this distributed approach, the processing time drops to about 2 minutes, eliminating the single‑reducer bottleneck.
Images illustrating the original DAG and the optimized workflow are included below:
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Alibaba Cloud Developer
Alibaba's official tech channel, featuring all of its technology innovations.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
