Unlock Faster ODPS SQL: Proven UNION, COUNT DISTINCT, and Join Optimizations
This article walks through common ODPS SQL scenarios—union, count distinct, large‑table joins, mapjoin, and predicate placement—explains why naïve implementations can be inefficient, shows how to read and interpret execution plans, and provides concrete rewritten queries that dramatically improve performance and resource usage.
Background
The article focuses on common SQL development scenarios in Alibaba's ODPS (MaxCompute) environment, especially cases that cause data skew or poor performance, and demonstrates how to write high‑efficiency SQL by inspecting execution plans.
Efficient UNION Writing
When merging two tables that share overlapping customer records, a naïve UNION without de‑duplication forces a full shuffle and writes the same amount of data as the source, leading to lower efficiency. The optimal approach adds a GROUP BY after each side of the union to deduplicate before the shuffle.
SELECT cst_id, cst_info
FROM (
SELECT cst_id, cst_info FROM @cst_info_a WHERE dt='${bizdate}'
GROUP BY cst_id, cst_info
UNION
SELECT cst_id, cst_info FROM @cst_info_b WHERE dt='${bizdate}'
GROUP BY cst_id, cst_info
) cst_info;Both the original and the optimized versions produce the same execution plan because ODPS already optimizes plain UNION for this pattern.
Is COUNT(DISTINCT) Really Slow?
Counting distinct customers over the last five days is a typical exploratory query. The naïve form uses COUNT(DISTINCT cst_id), while the optimized form first deduplicates with GROUP BY and then counts.
-- naïve
SELECT COUNT(DISTINCT cst_id) AS cst_cnt
FROM @pc_bill_bal
WHERE dt BETWEEN '${bizdate-5}' AND '${bizdate}';
-- optimized
SELECT COUNT(1) AS cst_cnt
FROM (
SELECT cst_id
FROM @pc_bill_bal
WHERE dt BETWEEN '${bizdate-5}' AND '${bizdate}'
GROUP BY cst_id
) base;The execution plan shows that ODPS rewrites the naïve query into two deduplication steps, so the performance gap is modest (≈28% faster for the optimized version). However, when the query also groups by date, the naïve version becomes dramatically slower (≈26×) because it shuffles billions of rows, whereas the optimized version shuffles far fewer rows after early deduplication.
Large‑Table Joins (Aggregated Types)
Joining three large tables to obtain per‑customer asset totals can be written with multiple FULL OUTER JOIN + COALESCE, or by first unioning the tables and then aggregating. The union‑then‑aggregate approach reduces the number of shuffle stages.
-- traditional full outer join
SELECT COALESCE(tt1.cst_id, tt2.cst_id) AS cst_id,
COALESCE(tt1.bal_init_prin,0) AS bal_init_prin,
COALESCE(tt1.amt_retail_prin,0) AS amt_retail_prin,
COALESCE(tt2.amt_buy_prin,0) AS amt_buy_prin
FROM (
SELECT COALESCE(t1.cst_id,t2.cst_id) AS cst_id,
COALESCE(t1.bal_init_prin,0) AS bal_init_prin,
COALESCE(t2.amt_retail_prin,0) AS amt_retail_prin
FROM @bal_init t1 FULL OUTER JOIN @amt_retail t2 ON t1.cst_id=t2.cst_id
) tt1 FULL OUTER JOIN @amt_buy tt2 ON tt1.cst_id=tt2.cst_id; -- union‑then‑aggregate (method 1)
SELECT cst_id,
SUM(bal_init_prin) AS bal_init_prin,
SUM(amt_retail_prin) AS amt_retail_prin,
SUM(amt_buy_prin) AS amt_buy_prin
FROM (
SELECT cst_id, bal_init_prin, 0 AS amt_retail_prin, 0 AS amt_buy_prin FROM @bal_init
UNION ALL
SELECT cst_id, 0, amt_retail_prin, 0 FROM @amt_retail
UNION ALL
SELECT cst_id, 0, 0, amt_buy_prin FROM @amt_buy
) t1
GROUP BY cst_id;The two methods generate identical execution plans, but the union‑based version avoids multiple joins and reduces shuffle overhead. When the join key is an integer and the aggregation is simple, UNION ALL + GROUP BY consistently outperforms traditional joins, especially as the number of tables grows.
Large‑Table Joins (String Types)
For string‑type attributes, the same union‑then‑aggregate idea can be implemented using JSON aggregation and GET_JSON_OBJECT to reconstruct columns after a single shuffle.
SELECT cst_id,
GET_JSON_OBJECT(all_val,'$.bal_init_prin') AS bal_init_prin,
GET_JSON_OBJECT(all_val,'$.amt_retail_prin') AS amt_retail_prin,
GET_JSON_OBJECT(all_val,'$.amt_buy_prin') AS amt_buy_prin
FROM (
SELECT cst_id,
CONCAT('{', CONCAT_WS(',', COLLECT_SET(all_val)), '}') AS all_val
FROM (
SELECT cst_id, CONCAT('"bal_init_prin":"', bal_init_prin, '"') AS all_val FROM @bal_init
UNION ALL
SELECT cst_id, CONCAT('"amt_retail_prin":"', amt_retail_prin, '"') FROM @amt_retail
UNION ALL
SELECT cst_id, CONCAT('"amt_buy_prin":"', amt_buy_prin, '"') FROM @amt_buy
) t1
GROUP BY cst_id
) tt1;This approach yields a single shuffle and comparable performance to the numeric‑type union method, while avoiding the memory pressure of COLLECT_SET on very large tables.
MapJoin and DistMapJoin
MapJoin pushes a small table into the mapper’s memory, eliminating the reduce‑side join. ODPS now auto‑enables MapJoin for suitable inner or left joins. Example of a working MapJoin:
SELECT base.*, fee_year_rate.*
FROM @base base
INNER JOIN @fee_year_rate fee_year_rate ON (base.terms = fee_year_rate.terms);When the small table exceeds the configured memory limit ( set odps.sql.mapjoin.memory.max=2048 MB), MapJoin falls back to a regular join. The execution plan shows only map tasks when MapJoin is effective.
DistMapJoin extends this idea by explicitly hinting the engine to treat a medium‑size table as a distributed small table:
SELECT base.*, cst_info.*
FROM @base base
LEFT JOIN @cst_info cst_info /*+distmapjoin(cst_info(shard_count=20))*/
ON (base.cst_id = cst_info.cst_id AND base.origin_inst_code = cst_info.inst_id);The plan for DistMapJoin adds a reduce stage for sharding the hinted table but still avoids a full join shuffle, delivering 20‑30% speed gains in typical workloads.
Predicate Placement
Putting filter predicates immediately after the source table (predicate push‑down) is the recommended style. The article compares a “standard” query that filters before the join with a “non‑standard” query that filters after the join; both generate identical execution plans because ODPS already pushes predicates early.
-- standard (filter before join)
SELECT base.*, fee_year_rate.*
FROM (SELECT * FROM @base WHERE terms='12') base
INNER JOIN @fee_year_rate fee_year_rate ON (base.terms = fee_year_rate.terms);
-- non‑standard (filter after join)
SELECT base.*, fee_year_rate.*
FROM @base base
INNER JOIN @fee_year_rate fee_year_rate ON (base.terms = fee_year_rate.terms)
WHERE base.terms='12';Execution plans are identical, confirming that ODPS optimizes predicate placement automatically, though writing the query in the conventional way improves readability.
Conclusion
Effective ODPS SQL development hinges on reading execution plans, applying early deduplication, preferring UNION ALL + GROUP BY over multiple joins, leveraging MapJoin/DistMapJoin when the small table fits memory, and writing predicates close to the source tables. As ODPS continues to evolve, revisiting these patterns ensures consistently high performance.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
