Big Data 11 min read

Dynamic Broadcast State and Data Partitioning in an Apache Flink Fraud Detection Engine

This article demonstrates how to initialize, broadcast, and dynamically update rule sets in an Apache Flink fraud detection pipeline, using BroadcastProcessFunction and MapState to achieve runtime data partitioning without recompiling, and explains the underlying data exchange patterns such as forward, hash, rebalance, and broadcast.

Sohu Tech Products
Sohu Tech Products
Sohu Tech Products
Dynamic Broadcast State and Data Partitioning in an Apache Flink Fraud Detection Engine

In the previous blog we described the goals of a fraud detection engine and the required features, and showed how to use a modifiable rule set instead of a hard‑coded KeysExtractor to achieve custom data partitioning in a Flink application.

This article details the missing pieces: how to initialize the rule set and update it while the job is running, allowing dynamic partitioning combined with dynamic configuration so that recompilation and redeployment of the Flink job are no longer necessary.

Broadcast Rules

First, let’s look at the predefined data‑processing code:

DataStream<Alert> alerts =
    transactions
        .process(new DynamicKeyFunction())
        .keyBy((keyed) -> keyed.getKey())
        .process(new DynamicAlertFunction());

The DynamicKeyFunction provides dynamic data partitioning, while DynamicAlertFunction contains the main processing logic and triggers alerts based on the defined rules.

In the earlier article we assumed that the rule set ( List<Rules>) was already initialized and could be accessed inside DynamicKeyFunction:

public class DynamicKeyFunction
    extends ProcessFunction<Transaction, Keyed<Transaction, String, Integer>> {

    /* Simplified */
    List<Rule> rules = /* Rules that are initialized somehow. */;
    ...
}

Adding rules directly to a list in the job code works, but each rule change would require recompiling the job, which is unacceptable for a real‑world fraud detection system where rules change frequently.

Next, we review the example rule definition from the previous post (illustrated with an image in the original article).

The system’s final job graph consists of three main modules:

Transaction Source : a parallel source that consumes financial transaction streams from Kafka.

Dynamic Key Function : dynamically extracts the partition key; the subsequent keyBy hashes the key and partitions the data across downstream parallel instances.

Dynamic Alert Function : aggregates data in windows and generates alerts based on the rules.

Apache Flink Internal Data Exchange

The job graph also shows various data‑exchange patterns between operators. To understand the broadcast pattern we first review Flink’s message propagation methods:

FORWARD : data from each parallel instance of the source is sent directly to every parallel instance of the downstream operator, assuming both operators have the same parallelism.

HASH : data is hashed (via keyBy) and evenly distributed across all parallel instances of the next operator.

REBALANCE : manually invoked rebalance() or a change in parallelism causes a round‑robin redistribution of data, useful for alleviating data skew.

BROADCAST : a special channel that sends each incoming message to **all** downstream parallel instances. In the fraud‑detection job a Rules Source consumes rule updates from Kafka and broadcasts them so that every parallel instance of the processing operators receives the same rule data.

Broadcast State

To connect the rule stream to the main data stream we create a broadcast stream with a state descriptor:

// Streams setup
DataStream<Transaction> transactions = [...];
DataStream<Rule> rulesUpdateStream = [...];

BroadcastStream<Rule> rulesStream = rulesUpdateStream.broadcast(RULES_STATE_DESCRIPTOR);

// Processing pipeline setup
DataStream<Alert> alerts =
    transactions
        .connect(rulesStream)
        .process(new DynamicKeyFunction())
        .keyBy((keyed) -> keyed.getKey())
        .connect(rulesStream)
        .process(new DynamicAlertFunction());

The broadcast call together with a MapStateDescriptor creates a broadcast stream whose state is automatically materialized as a KV‑style MapState.

public static final MapStateDescriptor<Integer, Rule> RULES_STATE_DESCRIPTOR =
    new MapStateDescriptor<>("rules", Integer.class, Rule.class);

After connecting rulesStream, the underlying ProcessFunction becomes a BroadcastProcessFunction. The new method processBroadcastElement handles incoming rule updates, while processElement processes the main transaction stream.

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> {
    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}

The updated DynamicKeyFunction now looks like this:

public class DynamicKeyFunction extends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {

    @Override
    public void processBroadcastElement(Rule rule, Context ctx, Collector<Keyed<Transaction, String, Integer>> out) {
        BroadcastState<Integer, Rule> broadcastState = ctx.getBroadcastState(RULES_STATE_DESCRIPTOR);
        broadcastState.put(rule.getRuleId(), rule);
    }

    @Override
    public void processElement(Transaction event, ReadOnlyContext ctx, Collector<Keyed<Transaction, String, Integer>> out) {
        ReadOnlyBroadcastState<Integer, Rule> rulesState = ctx.getBroadcastState(RULES_STATE_DESCRIPTOR);
        for (Map.Entry<Integer, Rule> entry : rulesState.immutableEntries()) {
            Rule rule = entry.getValue();
            out.collect(new Keyed<>(event, KeysExtractor.getKey(rule.getGroupingKeyNames(), event), rule.getRuleId()));
        }
    }
}

Here, processElement receives financial transactions, while processBroadcastElement receives rule updates. New rules are stored in the broadcast MapState and are visible to every parallel instance of the downstream operators. The alert function follows the same pattern, retrieving the rule by its ID from the broadcast state and applying the rule‑specific logic.

Summary

This article continued the exploration of a fraud‑detection system built with Apache Flink, focusing on the different ways data can be exchanged between parallel operator instances, especially the broadcast state mechanism. By leveraging broadcast state we can dynamically modify partition keys and rule sets at runtime, a capability useful for many scenarios such as controlling state, running A/B experiments, or updating ML model coefficients.

This article is a translation; original author: zhisheng. Original Chinese URL: http://www.54tianzhisheng.cn/2021/01/23/Flink-Fraud-Detection-engine-2/ English author: alex_fedulov English original URL: https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html
Javafraud detectionApache FlinkStreamingBroadcast StateDynamic Key Function
Sohu Tech Products
Written by

Sohu Tech Products

A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.