Big Data 15 min read

Using Flink Broadcast State for Dynamic Configuration Updates and Real‑Time Data Enrichment

This article explains how Flink's Broadcast State feature can be used to dynamically update processing rules and enrich streaming events with user information from MySQL, showing configuration, code examples, key considerations, and runtime results that demonstrate real‑time adaptability without restarting the job.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using Flink Broadcast State for Dynamic Configuration Updates and Real‑Time Data Enrichment

Flink supports Broadcast State from version 1.5.0, which allows broadcasting dynamic rules or configuration to downstream tasks for real‑time processing.

Typical scenarios include dynamically updating calculation rules and adding extra fields such as user profile information.

Key points: Broadcast State is a Map‑type KV store; it can only be modified in the broadcast side (processBroadcastElement) and read in the non‑broadcast side (processElement). Element order may differ across tasks; it is checkpointed per task; currently it resides in memory and cannot be persisted in RocksDB.

Example use case: filter events based on a user list stored in MySQL and enrich events with user name and age using Broadcast State.

Data sources:

Event stream from Kafka (JSON):

# 某个用户在某个时刻浏览或点击了某个商品
{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
{"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}

Configuration stream from MySQL (user table):

+----------+-------------+------+-----+---------+-------+
| Field    | Type        | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+-------+
| userID   | varchar(20) | NO   | PRI | NULL    |       |
| userName | varchar(10) | YES  |     | NULL    |       |
| userAge  | int(11)     | YES  |     | NULL    |       |
+----------+-------------+------+-----+---------+-------+

Dependencies:

<!--Kafka连接器-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.8.0</version>
</dependency>

<!--Mysql JDBC Driver-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

Java implementation (simplified):

package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Summary:
 *   基于Broadcast State 动态更新配置以实现实时过滤数据并增加字段
 */
@Slf4j
public class TestBroadcastState {
    public static void main(String[] args) throws Exception {
        // 1. Parse arguments
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));
        // checkpoint configuration
        String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
        long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");
        // Kafka configuration
        String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
        String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
        String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");
        // MySQL configuration
        String fromMysqlHost = parameterTool.getRequired("fromMysql.host");
        int fromMysqlPort = parameterTool.getInt("fromMysql.port");
        String fromMysqlDB = parameterTool.getRequired("fromMysql.db");
        String fromMysqlUser = parameterTool.getRequired("fromMysql.user");
        String fromMysqlPasswd = parameterTool.getRequired("fromMysql.passwd");
        int fromMysqlSecondInterval = parameterTool.getInt("fromMysql.secondInterval");
        // 2. Set up execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 3. Kafka source
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers", fromKafkaBootstrapServers);
        kafkaProperties.put("group.id", fromKafkaGroupID);
        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setStartFromLatest();
        DataStream<String> kafkaSource = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id-kafka-source");
        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> eventStream = kafkaSource.process(new ProcessFunction<String, Tuple4<String, String, Integer, Integer>>() {
            @Override
            public void processElement(String value, Context ctx, Collector<Tuple4<String, String, String, Integer>> out) {
                try {
                    JSONObject obj = JSON.parseObject(value);
                    String userID = obj.getString("userID");
                    String eventTime = obj.getString("eventTime");
                    String eventType = obj.getString("eventType");
                    int productID = obj.getIntValue("productID");
                    out.collect(new Tuple4<>(userID, eventTime, eventType, productID));
                } catch (Exception ex) {
                    log.warn("异常数据:{}", value, ex);
                }
            }
        });
        // 4. MySQL config source (custom source)
        DataStreamSource<HashMap<String, Tuple2<String, Integer>>> configStream = env.addSource(new MysqlSource(fromMysqlHost, fromMysqlPort, fromMysqlDB, fromMysqlUser, fromMysqlPasswd, fromMysqlSecondInterval));
        // 5. Broadcast config stream
        MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
        BroadcastStream<HashMap<String, Tuple2<String, Integer>>> broadcastConfigStream = configStream.broadcast(configDescriptor);
        // 6. Connect streams and process
        BroadcastConnectedStream<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>> connectedStream = eventStream.connect(broadcastConfigStream);
        SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> resultStream = connectedStream.process(new CustomBroadcastProcessFunction());
        // 7. Output
        resultStream.print();
        // 8. Execute job
        env.execute();
    }

    // Custom BroadcastProcessFunction implementation omitted for brevity
    // ... (includes processElement and processBroadcastElement that join event with broadcasted user info)
}

Running results show that before configuration changes the output contains enriched events for users present in the MySQL table; after updating ages in MySQL the new ages appear instantly; after deleting users from MySQL only remaining users are processed.

Thus, Broadcast State enables real‑time rule updates and data enrichment without stopping the Flink job.

Javareal-time processingFlinkDynamic ConfigurationKafkaMySQLBroadcast State
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.