Dynamic Configuration Updates in Real-Time Streaming with Spark Broadcast Variables and Flink Broadcast State
This article explains how to dynamically update configuration data in real‑time Spark Streaming and Flink jobs using broadcast variables and broadcast state, providing Java code examples and discussing the limitations and practical considerations of each approach.
In real‑time computation jobs, configurations often need to change dynamically, such as log format changes, new NLP domain words, or risk‑control rule adjustments. Manually modifying code and restarting the job is undesirable; Spark Streaming and Flink broadcast mechanisms can address this.
Spark Streaming scenario
Spark Core’s broadcast mechanism provides read‑only variables cached on each executor. The original design does not support updating broadcast variables, so a custom solution is needed.
Broadcast variables allow the programmer to keep a read‑only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
The following Java class wraps a string broadcast variable in a singleton that refreshes the value every 60 seconds.
<span>public class BroadcastStringPeriodicUpdater {</span>
<span> private static final int PERIOD = 60 * 1000;</span>
<span> private static volatile BroadcastStringPeriodicUpdater instance;</span>
<span></span>
<span> private Broadcast<String> broadcast;</span>
<span> private long lastUpdate = 0L;</span>
<span></span>
<span> private BroadcastStringPeriodicUpdater() {}</span>
<span></span>
<span> public static BroadcastStringPeriodicUpdater getInstance() {</span>
<span> if (instance == null) {</span>
<span> synchronized (BroadcastStringPeriodicUpdater.class) {</span>
<span> if (instance == null) {</span>
<span> instance = new BroadcastStringPeriodicUpdater();</span>
<span> }</span>
<span> }</span>
<span> }</span>
<span> return instance;</span>
<span> }</span>
<span></span>
<span> public String updateAndGet(SparkContext sc) {</span>
<span> long now = System.currentTimeMillis();</span>
<span> long offset = now - lastUpdate;</span>
<span> if (offset > PERIOD || broadcast == null) {</span>
<span> if (broadcast != null) {</span>
<span> broadcast.unpersist();</span>
<span> }</span>
<span> lastUpdate = now;</span>
<span> String value = fetchBroadcastValue();</span>
<span> broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);</span>
<span> }</span>
<span> return broadcast.getValue();</span>
<span> }</span>
<span></span>
<span> private String fetchBroadcastValue() {</span>
<span> // implement fetching logic here</span>
<span> }</span>
<span>}</span>In the streaming job you can retrieve the latest value as follows:
<span>dStream.transform(rdd -> {</span>
<span> String broadcastValue = BroadcastStringPeriodicUpdater.getInstance().updateAndGet(rdd.context());</span>
<span> rdd.mapPartitions(records -> {</span>
<span> // processing logic using broadcastValue</span>
<span> });</span>
<span>});</span>This approach works but is limited by the periodic update interval and the micro‑batch nature of Spark Streaming.
Flink scenario
Flink also offers broadcast variables, but since version 1.5 it provides a more flexible broadcast state that can be updated with low latency.
The broadcast state is defined with a MapStateDescriptor:
<span>MapStateDescriptor<String, String> broadcastStateDesc = new MapStateDescriptor<>(</span>
<span> "broadcast-state-desc", String.class, String.class);</span>A control stream is turned into a broadcast stream:
<span>BroadcastStream<String> broadcastStream = controlStream</span>
<span> .setParallelism(1)</span>
<span> .broadcast(broadcastStateDesc);</span>The data stream is then connected with the broadcast stream:
<span>BroadcastConnectedStream<String, String> connectedStream = sourceStream.connect(broadcastStream);</span>Processing is performed with a BroadcastProcessFunction (or KeyedBroadcastProcessFunction for keyed streams). The function has two methods: processElement reads the broadcast state, and processBroadcastElement updates it.
<span>connectedStream.process(new BroadcastProcessFunction<String, String, String>() {</span>
<span> @Override</span>
<span> public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {</span>
<span> ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);</span>
<span> for (Map.Entry<String, String> entry : state.immutableEntries()) {</span>
<span> // use broadcast data to process the main stream</span>
<span> }</span>
<span> out.collect(value);</span>
<span> }</span>
<span> @Override</span>
<span> public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {</span>
<span> BroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);</span>
<span> state.put("some_key", value);</span>
<span> }</span>
<span>});</span>Note that the context in processElement is read‑only; only the broadcast side can modify the state, which guarantees consistency across parallel operator instances.
Source: jianshu/p/97dae75c266c Author: LittleMagic
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
