Big Data 19 min read

Implementing User Purchase Behavior Tracking with Flink Broadcast State

This article explains how to use Flink's Broadcast State to track user purchase paths in real time, detailing the design, required Kafka streams, Java APIs, state management, dynamic configuration, code implementation, deployment steps, and example results for a big‑data streaming application.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Implementing User Purchase Behavior Tracking with Flink Broadcast State

Broadcast State is a Flink operator state that allows data records from one stream to be broadcast to all downstream tasks, enabling shared configuration and decoupling from external systems.

Two main APIs are KeyedBroadcastProcessFunction and BroadcastProcessFunction. Their abstract methods processElement and processBroadcastElement handle the regular and broadcast streams respectively.

The article presents a practical scenario: tracking a user's purchase path length in a mobile app. User events (VIEW_PRODUCT, ADD_TO_CART, REMOVE_FROM_CART, PURCHASE) are ingested from a Kafka topic, while dynamic configuration (e.g., maxPurchasePathLength) is read from another Kafka topic and broadcast.

Key design steps include:

Creating a keyed user‑event stream from Kafka.

Creating a broadcast configuration stream.

Connecting the streams and processing them with a custom ConnectedBroadcastProcessFuntion that stores per‑user events in a MapState, updates configuration in broadcast state, and computes the purchase path length when a PURCHASE event arrives.

Emitting the result (userId, channel, purchasePathLength, eventTypeCounts) to an output Kafka topic.

Relevant code snippets:

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction { ... }
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { ... }
// create user event stream
final FlinkKafkaConsumer010 kafkaUserEventSource = new FlinkKafkaConsumer010<>(parameterTool.getRequired("input-event-topic"), new SimpleStringSchema(), parameterTool.getProperties());
... // map, assign timestamps, keyBy ...
// create broadcast config stream
final FlinkKafkaConsumer010 kafkaConfigEventSource = new FlinkKafkaConsumer010<>(parameterTool.getRequired("input-config-topic"), new SimpleStringSchema(), parameterTool.getProperties());
final BroadcastStream<Config> configBroadcastStream = env.addSource(kafkaConfigEventSource).map(...).broadcast(configStateDescriptor);
// process element
public void processElement(UserEvent value, ReadOnlyContext ctx, Collector<EvaluatedResult> out) throws Exception { ... }
// compute result
private Optional<EvaluatedResult> compute(Config config, UserEventContainer container) { ... }

The job is configured with checkpointing to HDFS, uses event‑time semantics, and can be submitted with

bin/flink run -c org.shirdrn.flink.broadcaststate.UserPurchaseBehaviorTracker …

. Changing the configuration (e.g., maxPurchasePathLength) at runtime immediately affects the computation, as demonstrated by example input events and resulting JSON output.

Finally, the article includes a call‑to‑action encouraging readers to like, bookmark, and share.

JavaBig DataFlinkKafkaBroadcast State
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.