Optimizing Large‑Scale Table Joins in Spark Using Bloom Filters
To address the resource‑intensive challenges of joining billion‑row tables in data warehouses, this article examines common optimization approaches, analyzes Spark’s SortMergeJoin algorithm, and proposes a Bloom‑filter‑based solution that filters unchanged data early, dramatically improving performance and reducing cluster resource consumption.
In the big‑data industry, especially data‑warehouse construction, analyzing and joining very large tables (billions of rows) consumes excessive CPU, memory and I/O, slows down tasks and raises cost.
Problem Background
Challenges include high resource consumption, long execution time, and huge financial cost when performing joins on massive tables.
Common Optimization Approaches
1. Increase cluster resources
Simple but costly; requires budget.
2. Incremental computation
Reduces resource usage but needs suitable data (e.g., log‑type or lifecycle data) and business constraints.
Spark SortMergeJoin Analysis
The classic Spark SortMergeJoin performs a shuffle‑repartition, sorts each partition, then merges records. Its performance bottleneck lies in the Sort‑Merge‑Shuffle stage, which scales poorly with data size.
Bloom‑Filter‑Based Optimization
The core idea is to filter unchanged records before the join by building a Bloom filter on primary keys that have changed, thus dramatically shrinking the data volume processed in the Map stage.
Spark Function Java Implementation
package org.example;
import org.apache.curator.shaded.com.google.common.hash.BloomFilter;
import org.apache.curator.shaded.com.google.common.hash.Funnels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.api.java.*;
import org.apache.spark.SparkConf;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.RamUsageEstimator;
/**
* add by chengwansheng
*/
class MyBloomFilter {
private BloomFilter bloomFilter;
public MyBloomFilter(BloomFilter b) {
bloomFilter = b;
}
public BloomFilter getBloomFilter() {
return bloomFilter;
}
}
public class BloomUdf implements UDF2
{
//最大记录限制,安全起见
private static int maxSize = 50000000;
//布隆过滤器是否开启配置, 1 开启,0 关闭
private static int udfBloomFilterEnable;
//布隆过滤器是否开启参数,默认开启
private static String bloomFilterConfKey = "spark.myudf.bloom.enable";
static {
SparkConf sparkConf = new SparkConf();
udfBloomFilterEnable = sparkConf.getInt(bloomFilterConfKey, 1);
System.out.println("the spark.myudf.bloom.enable value " + udfBloomFilterEnable);
}
//布隆过滤器列表,支持多个布隆过滤器
private static ConcurrentHashMap
bloomFilterMap = new ConcurrentHashMap<>();
/**
* 布隆过滤器核心构建方法
* 通过读取表的 hdfs 文件信息,构建布隆过滤器
* 一个 jvm 只加载一次
* @param key
* @param path
* @throws IOException
*/
private synchronized static void buildBloomFilter(String key, String path) throws IOException {
if (!bloomFilterMap.containsKey(key)) {
BloomFilter bloomFilter;
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
Path pathDf = new Path(path);
FileStatus[] stats = hdfs.listStatus(pathDf);
//获取记录总数
long sum = 0;
for (int i = 0; i < stats.length; i++) {
InputStream inputStream = hdfs.open(stats[i].getPath());
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader reader = new BufferedReader(inputStreamReader);
sum = sum + reader.lines().count();
}
if (sum > maxSize) {
//如果数据量大于期望值,则将布隆过滤器置空(即布隆过滤器不起作用)
System.out.println("the max number is " + maxSize + ", but target num is too big, the " + key + " bloom will be invalid");
bloomFilter = null;
} else {
//默认 1000W,超过取样本数据 2 倍的量。这里取 2 倍是为了提高布隆过滤器的效果, 2 倍是一个比较合适的值
long exceptSize = sum * 2 > 10000000 ? sum * 2 : 10000000;
bloomFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), (int) exceptSize);
for (int i = 0; i < stats.length; i++) {
//打印每个文件路径
System.out.println(stats[i].getPath().toString());
//读取每个文件
InputStream inputStream = hdfs.open(stats[i].getPath());
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader reader = new BufferedReader(inputStreamReader);
String line = "";
while ((line = reader.readLine()) != null) {
bloomFilter.put(line);
}
}
}
MyBloomFilter myBloomFilter = new MyBloomFilter(bloomFilter);
bloomFilterMap.put(key, myBloomFilter);
System.out.println("the bloom " + key + " size is " + RamUsageEstimator.humanSizeOf(bloomFilter) + " num " + sum);
}
}
/**
* 核心调用方法
* 参数 s :被过滤的参数
* 参数 key:需要构建的布隆过滤器,此处是库名 + 表名称,即 db_name.table_name
* @param s
* @param key
* @return
* @throws Exception
*/
@Override
public Boolean call(Object s, String key) throws Exception {
//如果 spark.myudf.bloom.enable 参数配置为 0,则布隆过滤器失效,直接返回 true
if (udfBloomFilterEnable == 0) {
return true;
}
if (!bloomFilterMap.containsKey(key)) {
String[] table_array = key.split("\\.");
if (table_array.length != 2) {
String msg = "the key is invalid: " + key + ", must like db_name.table_name";
System.out.println(msg);
throw new IOException(msg);
}
String dbName = table_array[0];
String tableName = table_array[1];
String path = "/hive/" + dbName + ".db/" + tableName;
System.out.println(path);
//构建布隆过滤器
buildBloomFilter(key, path);
}
if (!bloomFilterMap.containsKey(key)) {
String msg = "not found bloom filter " + key;
System.out.println(msg);
throw new IOException(msg);
}
BloomFilter bloomFilter = bloomFilterMap.get(key).getBloomFilter();
if (bloomFilter == null) {
//如果数据量大于期望值,则直接返回真,即布隆过滤器不起作用
return true;
} else {
return bloomFilter.mightContain(String.valueOf(s));
}
}
}Usage Example
SQL to build a temporary table of changed primary keys and then apply the bloom_filter UDF in the join and in a union with unchanged historical data.
--构建布隆过滤器
DROP TABLE IF EXISTS tmp.tmp_primary_key;
CREATE TABLE tmp.tmp_primary_key STORED AS TEXTFILE AS
SELECT item_id
FROM (
SELECT item_id FROM default.A WHERE update_time >= '2023-04-22'
UNION ALL
SELECT item_id FROM default.B WHERE update_time >= '2023-04-22'
) WHERE LENGTH(item_id) > 0
GROUP BY item_id;
--增量数据计算
INSERT OVERWRITE TABLE default.ot PARTITION(pt='20230422')
SELECT B.item_id, B.sku_id, B.sku_price, A.item_price
FROM default.B
LEFT JOIN default.A ON (A.item_id = B.item_id AND bloom_filter(A.item_id, "tmp.tmp_primary_key"))
WHERE bloom_filter(B.item_id, "tmp.tmp_primary_key")
UNION ALL
--合并历史未变更数据
SELECT item_id, sku_id, sku_price, item_price
FROM default.ot
WHERE NOT bloom_filter(item_id, "tmp.tmp_primary_key")
AND pt='20230421';Experimental Results
Tests on internal Spark‑2 jobs (e.g., dim.dim_itm_sku_info_detail_d and dim.dim_itm_info_detail_d) show that the Bloom‑filter solution significantly improves task performance and reduces cluster resource usage.
Conclusion
From both theoretical analysis and practical measurements, the Bloom‑filter‑based solution greatly enhances performance for large‑table join calculations and can be extended to other big‑data analytical scenarios by eliminating unnecessary data early in the processing pipeline.
政采云技术
ZCY Technology Team (Zero), based in Hangzhou, is a growth-oriented team passionate about technology and craftsmanship. With around 500 members, we are building comprehensive engineering, project management, and talent development systems. We are committed to innovation and creating a cloud service ecosystem for government and enterprise procurement. We look forward to your joining us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.