Implementing User Purchase Behavior Tracking with Flink Broadcast State
This article explains how to use Flink's Broadcast State to track user purchase paths in real time, detailing the design, required Kafka streams, Java APIs, state management, dynamic configuration, code implementation, deployment steps, and example results for a big‑data streaming application.
Broadcast State is a Flink operator state that allows data records from one stream to be broadcast to all downstream tasks, enabling shared configuration and decoupling from external systems.
Two main APIs are KeyedBroadcastProcessFunction and BroadcastProcessFunction. Their abstract methods processElement and processBroadcastElement handle the regular and broadcast streams respectively.
The article presents a practical scenario: tracking a user's purchase path length in a mobile app. User events (VIEW_PRODUCT, ADD_TO_CART, REMOVE_FROM_CART, PURCHASE) are ingested from a Kafka topic, while dynamic configuration (e.g., maxPurchasePathLength) is read from another Kafka topic and broadcast.
Key design steps include:
Creating a keyed user‑event stream from Kafka.
Creating a broadcast configuration stream.
Connecting the streams and processing them with a custom ConnectedBroadcastProcessFuntion that stores per‑user events in a MapState, updates configuration in broadcast state, and computes the purchase path length when a PURCHASE event arrives.
Emitting the result (userId, channel, purchasePathLength, eventTypeCounts) to an output Kafka topic.
Relevant code snippets:
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction { ... } public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { ... } // create user event stream
final FlinkKafkaConsumer010 kafkaUserEventSource = new FlinkKafkaConsumer010<>(parameterTool.getRequired("input-event-topic"), new SimpleStringSchema(), parameterTool.getProperties());
... // map, assign timestamps, keyBy ... // create broadcast config stream
final FlinkKafkaConsumer010 kafkaConfigEventSource = new FlinkKafkaConsumer010<>(parameterTool.getRequired("input-config-topic"), new SimpleStringSchema(), parameterTool.getProperties());
final BroadcastStream<Config> configBroadcastStream = env.addSource(kafkaConfigEventSource).map(...).broadcast(configStateDescriptor); // process element
public void processElement(UserEvent value, ReadOnlyContext ctx, Collector<EvaluatedResult> out) throws Exception { ... } // compute result
private Optional<EvaluatedResult> compute(Config config, UserEventContainer container) { ... }The job is configured with checkpointing to HDFS, uses event‑time semantics, and can be submitted with
bin/flink run -c org.shirdrn.flink.broadcaststate.UserPurchaseBehaviorTracker …. Changing the configuration (e.g., maxPurchasePathLength) at runtime immediately affects the computation, as demonstrated by example input events and resulting JSON output.
Finally, the article includes a call‑to‑action encouraging readers to like, bookmark, and share.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
