Real-Time Data Warehouse Development with Flink: Architecture, Implementation, and Lessons Learned
This article describes the motivation, technology selection, implementation details, and encountered challenges of building a real‑time data warehouse using Flink, covering streaming computation, code examples, dimension‑table caching, state backend choices, and best practices for production deployment.
Background: Traditional offline data warehouse models are carefully crafted but streaming processing differs; as business demands grew for low‑latency data, a real‑time warehouse was introduced, consisting of five layers (real‑time ingestion, computation, storage, service, and application), with this article focusing on the computation layer.
Technology selection: Among Storm, Spark‑Streaming, and Flink, Flink was chosen for its high throughput, millisecond‑level latency, exactly‑once semantics, and flexible windowing capabilities.
Architecture: The overall architecture integrates multiple data sources (e.g., pcm/app topics) into a unified format, following the flow source → map → flatMap → sink. Images illustrating the architecture are omitted for brevity.
Development practice: The real‑time model mirrors the offline model, starting from the gdm layer to combine various sources, clean and enrich data, and output a unified stream. The core Flink job is shown below.
private static void proc(String[] args) throws Exception {<br/> // 获取运行时环境<br/> final StreamExecutionEnvironment env = CommonUtil.getOnlineStreamEnv(TimeCharacteristic.ProcessingTime);<br/> // 创建kafkaconsumer<br/> FlinkKafkaConsumer010 consumer = CommonUtil.genKafkaConsumer(Constants.TOPIC_WEB_APPLOG, args);<br/> // 处理流程<br/> // source -> map(数据过滤,清洗) -> flatMap(关联维表) -> sink<br/> DataStream<String> stream = env.addSource(consumer)<br/> .map(new AppLogEntity.Mapper())<br/> .flatMap(new AppLogEntity.Flater())<br/> .returns(String.class);<br/> stream.process(new ProcessFunction<String, String>() {<br/> @Override<br/> public void processElement(String value, Context ctx, Collector<String> out) {<br/> out.collect(value);<br/> }<br/> }).addSink(CommonUtil.genUasKafkaProducer());<br/> env.execute(JOB_NAME);<br/>}Map function: filters special characters, splits the line by tabs, discards records with fewer than 40 fields, and parses logs based on their prefix (app usage, event, or client logs).
public static class Mapper implements MapFunction<String, AppLog> {<br/> @Override<br/> public AppLog map(String line) {<br/> // 过滤特殊字符<br/> line = line.trim().replace("\007", "").replace("\t", "").replaceAll("\
", "");<br/> // 按照\t切分<br/> List<String> tokens = StringUtil.fastSplit(line, Constants.APP_LOG_ORIGINAL_SPERATOR, 40);<br/> // size < 40 为非法数据<br/> if (tokens.size() < 40) {<br/> return null;<br/> }<br/> // 根据前缀判断数据如何处理<br/> switch (tokens.get(0)) {<br/> case Constants.APP_ACT_LOG_PREFIX:<br/> return new AppUsingLogEntity().parse(tokens);<br/> case Constants.APP_EVENT_LOG_PREFIX:<br/> return new AppEventLogEntity().parse(tokens);<br/> case Constants.APP_CLT_LOG_PREFIX:<br/> return new AppCltEntity().parse(tokens);<br/> default:<br/> return null;<br/> }<br/> }<br/>}FlatMap function: loads dimension caches, joins event logs with dimension tables, filters blank results, and emits cleaned strings.
public static class Flater extends RichFlatMapFunction<AppLog, String> {<br/> @Override<br/> public void open(Configuration parameters) throws Exception {<br/> super.open(parameters);<br/> DcDimAppEventParameterCache.getInstance();<br/> DcDimAppEventSiteMappingCache.getInstance();<br/> DcDimEventActionTypeCache.getInstance();<br/> DcDimEventOperTypeRowCache.getInstance();<br/> DcDimEventBizTypeCache.getInstance();<br/> }<br/> @Override<br/> public void flatMap(AppLog model, Collector<String> collector) {<br/> if (null == model) {<br/> return;<br/> }<br/> switch (model.getLogType()) {<br/> case Constants.APP_EVENT_LOG_PREFIX:<br/> AppEventLogEntity entity = (AppEventLogEntity) model;<br/> AppEventLogEntity.joinAndFix(entity);<br/> List<String> list = entity.getResult(entity);<br/> for (String outLine : list) {<br/> if (StringUtils.isBlank(outLine)) {<br/> continue;<br/> }<br/> collector.collect(outLine);<br/> }<br/> return;<br/> case Constants.APP_ACT_LOG_PREFIX:<br/> collector.collect(model.getResult());<br/> return;<br/> case Constants.APP_CLT_LOG_PREFIX:<br/> collector.collect(model.getResult());<br/> return;<br/> }<br/> }<br/>}Pitfalls encountered: (1) Dimension‑table joins caused duplicate data because the dimension state never expired, leading to Cartesian products; the solution was to cache small dimensions locally and larger ones in Redis with an LRU eviction policy. (2) Large state size caused out‑of‑memory crashes; after evaluating MemoryStateBackend, FsStateBackend, and RocksDBStateBackend, RocksDBStateBackend with incremental checkpoints was adopted.
Conclusion: A real‑time warehouse complements an offline warehouse by addressing timeliness without diminishing overall data‑warehouse value. The presented solution demonstrates a practical integration of streaming logs into user‑behavior analytics and highlights continuous improvement by monitoring industry‑leading approaches.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
