Comprehensive Guide to FlinkCEP: API Overview, Pattern Definitions, Quantifiers, Conditions, and Usage Examples
This article provides a detailed introduction to FlinkCEP, covering how to add the library, define simple and composite patterns, use quantifiers and conditions, handle skip strategies, time constraints, and select results, with complete Java and Scala code examples for complex event processing.
0. Overview
FlinkCEP is a complex event processing (CEP) library built on top of Flink that enables detection of event patterns in a stream, allowing you to capture important occurrences in your data.
The article describes the available API calls in FlinkCEP, starting with the Pattern API for specifying patterns, then how to detect matching event sequences and act on them, and finally the assumptions made by the CEP library when handling event-time delays.
1. Getting Started
First, add the CEP library to your pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.5.0</version>
</dependency>Events used in pattern matching must correctly implement equals() and hashCode() because FlinkCEP relies on them for comparison.
First demo (Java):
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
})
.next("middle").subtype(SubEvent.class).where(
new SimpleCondition<Event>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
})
.followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
});
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlertFrom(pattern);
}
});Scala version:
val input: DataStream[Event] = ...
val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
.next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
.followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(input, pattern)
val result: DataStream[Alert] = patternStream.select(createAlert(_))2. Pattern API
The Pattern API lets you define complex pattern sequences to extract from an input stream. A complex sequence consists of multiple simple patterns that match individual events sharing certain attributes.
Each pattern must have a unique name, and pattern names cannot contain the character “:”. The following sections explain how to define single patterns and then combine them into complex sequences.
2.1 Single Pattern
A pattern can be a single event or a looping pattern. Looping patterns (e.g., a b+ c? d) allow multiple occurrences of a sub‑pattern. By default a pattern is single; you can turn it into a looping pattern with quantifiers.
2.1.1 Quantifiers
FlinkCEP provides methods to specify looping behavior: oneOrMore() – expects one or more occurrences. times(n) – expects exactly n occurrences. times(min, max) – expects between min and max occurrences. timesOrMore(n) – expects at least n occurrences. optional() – makes the pattern optional. greedy() – repeats as many times as possible. consecutive() – enforces strict contiguity for looping patterns. allowCombinations() – enables nondeterministic relaxed contiguity.
Examples for a pattern named start:
// expecting 4 occurrences
start.times(4);
// optional 4 occurrences
start.times(4).optional();
// 2‑4 occurrences
start.times(2, 4);
// 2‑4 occurrences, greedy
start.times(2, 4).greedy();
// 0‑2‑4 optional
start.times(2, 4).optional();
// 1+ occurrences, greedy
start.oneOrMore().greedy();
// 0+ occurrences, optional
start.oneOrMore().optional();
// at least 2 occurrences
start.timesOrMore(2);
// at least 2 occurrences, greedy
start.timesOrMore(2).greedy();2.1.2 Conditions
Conditions restrict which events can satisfy a pattern. They can be based on event properties, event continuity, or combinations thereof.
Iterative Conditions
Iterative conditions can access previously matched events via the Context object, allowing calculations such as cumulative sums.
middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
}
double sum = value.getPrice();
for (Event e : ctx.getEventsForPattern("middle")) {
sum += e.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});Scala equivalent:
middle.oneOrMore().where((value, ctx) => {
lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
value.getName.startsWith("foo") && sum + value.getPrice < 5.0
})Simple Conditions
Simple conditions only examine the current event:
start.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});2.1.3 Subtype Constraint
You can restrict a pattern to a subclass of the base event type:
start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value) {
// custom condition
return true;
}
});2.1.4 API Overview
Key Pattern methods: where(condition) – adds an AND condition. or(condition) – adds an OR condition. until(condition) – defines a stop condition for looping patterns (only for oneOrMore()). subtype(class) – restricts the event type. oneOrMore(), times(n), times(min, max), timesOrMore(n) – define repetition. optional(), greedy(), consecutive(), allowCombinations() – modify looping behavior.
2.2 Composite Patterns
After defining single patterns, you can combine them using adjacency operators: next() – strict contiguity. followedBy() – relaxed contiguity. followedByAny() – nondeterministic relaxed contiguity. notNext() – strict negative pattern. notFollowedBy() – relaxed negative pattern (cannot end a sequence).
Example of strict, relaxed, and nondeterministic adjacency:
// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// nondeterministic relaxed contiguity
Pattern<Event, ?> nonDet = start.followedByAny("middle").where(...);Time constraints can be added with within(Time.seconds(10)). Only one time constraint is allowed per pattern sequence; if multiple are defined, the smallest is applied.
2.3 After‑Match Skip Strategies
Four skip strategies control how many matches are emitted: NO_SKIP – emit every possible match. SKIP_PAST_LAST_EVENT – discard partial matches that contain any matched event. SKIP_TO_FIRST – skip to the first occurrence of a named pattern. SKIP_TO_LAST – skip to the last occurrence of a named pattern.
Specify a strategy when creating the pattern, e.g.,
Pattern.begin("name", AfterMatchSkipStrategy.skipPastLastEvent()).
2.4 Detecting Patterns
Apply a pattern to a keyed or non‑keyed DataStream using CEP.pattern(input, pattern). For keyed streams, the pattern is evaluated per key; for non‑keyed streams the job runs with parallelism 1.
2.5 Selecting from Patterns
After a PatternStream is created, use select() or flatSelect() to extract results. PatternSelectFunction returns a single result per match, while PatternFlatSelectFunction can emit multiple results via a Collector.
2.6 Timeout Handling
If a pattern has a within() time window, partial matches that exceed the window are considered timed out. Provide a PatternTimeoutFunction (or PatternFlatTimeoutFunction) and an OutputTag to capture timed‑out matches.
2.7 Event‑Time Processing
CEP buffers incoming events sorted by timestamp and processes them when the watermark advances. Events with timestamps older than the last seen watermark are treated as late and are discarded.
2.8 Full Example
The following example detects a keyed stream of events matching the sequence start → middle(name="error") → end(name="critical") within 10 seconds, using event time:
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...;
DataStream<Event> keyed = input.keyBy(event -> event.getId());
Pattern<Event, ?> pattern = Pattern.begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event e) { return e.getName().equals("error"); }
})
.followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event e) { return e.getName().equals("critical"); }
})
.within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.pattern(keyed, pattern);
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlert(pattern);
}
});Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
