Lightweight Dimension Table Join in Flink Using a Scheduled Cache
The article demonstrates how to enrich Flink streaming ETL jobs with slowly changing dimension data by periodically loading MySQL tables into an in‑memory cache and performing a simple map‑side join within a custom RichMapFunction implementation.
When writing Flink‑based ETL programs, it is common to enrich the incoming stream with dimension data such as product names or category names, which are stored in external, bounded tables. This creates the classic problem of joining an unbounded stream with a bounded (or semi‑static) dimension table.
The usual approach is to use Flink's built‑in asynchronous I/O together with a cache (e.g., Guava LoadingCache) to reduce pressure on the external source, but the article skips the details of that method.
For small, slowly changing dimension data, a lightweight solution can be built: the example extracts siteId and cityId from an order log, looks up the corresponding site and city names from a MySQL dimension table, and writes the enriched information back to the log.
public static final class MapWithSiteInfoFunc extends RichMapFunction<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(MapWithSiteInfoFunc.class);
private static final long serialVersionUID = 1L;
private transient ScheduledExecutorService dbScheduler;
Map<Integer, SiteAndCityInfo> siteInfoCache;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
siteInfoCache = new HashMap<>(1024);
dbScheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread thread = new Thread(r, "site-info-update-thread");
thread.setUncaughtExceptionHandler((t, e) -> {
LOGGER.error("Thread " + t + " got uncaught exception: " + e);
});
return thread;
});
dbScheduler.scheduleWithFixedDelay(() -> {
try {
QueryRunner queryRunner = new QueryRunner(JdbcUtil.getDataSource());
List<Map<String, Object>> info = queryRunner.query(SITE_INFO_QUERY_SQL, new MapListHandler());
for (Map<String, Object> item : info) {
siteInfoCache.put((int) item.get("site_id"), new SiteAndCityInfo(
(int) item.get("site_id"),
(String) item.getOrDefault("site_name", ""),
(long) item.get("city_id"),
(String) item.getOrDefault("city_name", "")
));
}
LOGGER.info("Fetched {} site info records, {} records in cache", info.size(), siteInfoCache.size());
} catch (Exception e) {
LOGGER.error("Exception occurred when querying: " + e);
}
}, 0, 10 * 60, TimeUnit.SECONDS);
}
@Override
public String map(String value) throws Exception {
JSONObject json = JSON.parseObject(value);
int siteId = json.getInteger("site_id");
String siteName = "", cityName = "";
SiteAndCityInfo info = siteInfoCache.getOrDefault(siteId, null);
if (info != null) {
siteName = info.getSiteName();
cityName = info.getCityName();
}
json.put("site_name", siteName);
json.put("city_name", cityName);
return json.toJSONString();
}
@Override
public void close() throws Exception {
siteInfoCache.clear();
ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, dbScheduler);
JdbcUtil.close();
super.close();
}
private static final String SITE_INFO_QUERY_SQL = "...";
}The code wraps the entire join logic inside a RichMapFunction, uses a single‑threaded scheduled executor to pull dimension data from MySQL every ten minutes, stores it in a HashMap, and looks up the names by ID during mapping; the close() method safely shuts down the scheduler and database connections.
QueryRunner and MapListHandler come from Apache Commons DBUtils, while JdbcUtil encapsulates MySQL connection parameters and a DBCP2 BasicDataSource pool; developers can implement these utilities themselves.
Source: https://www.jianshu.com/p/21f60a37b83a
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
