Big Data 27 min read

Comprehensive Guide to FlinkCEP: API Overview, Pattern Definitions, Quantifiers, Conditions, and Usage Examples

This article provides a detailed introduction to FlinkCEP, covering how to add the library, define simple and composite patterns, use quantifiers and conditions, handle skip strategies, time constraints, and select results, with complete Java and Scala code examples for complex event processing.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Comprehensive Guide to FlinkCEP: API Overview, Pattern Definitions, Quantifiers, Conditions, and Usage Examples

0. Overview

FlinkCEP is a complex event processing (CEP) library built on top of Flink that enables detection of event patterns in a stream, allowing you to capture important occurrences in your data.

The article describes the available API calls in FlinkCEP, starting with the Pattern API for specifying patterns, then how to detect matching event sequences and act on them, and finally the assumptions made by the CEP library when handling event-time delays.

1. Getting Started

First, add the CEP library to your pom.xml:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.5.0</version>
</dependency>

Events used in pattern matching must correctly implement equals() and hashCode() because FlinkCEP relies on them for comparison.

First demo (Java):

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        })
    .next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        })
    .followedBy("end").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
        });

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(
    new PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, List<Event>> pattern) throws Exception {
            return createAlertFrom(pattern);
        }
    });

Scala version:

val input: DataStream[Event] = ...

val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.select(createAlert(_))

2. Pattern API

The Pattern API lets you define complex pattern sequences to extract from an input stream. A complex sequence consists of multiple simple patterns that match individual events sharing certain attributes.

Each pattern must have a unique name, and pattern names cannot contain the character “:”. The following sections explain how to define single patterns and then combine them into complex sequences.

2.1 Single Pattern

A pattern can be a single event or a looping pattern. Looping patterns (e.g., a b+ c? d) allow multiple occurrences of a sub‑pattern. By default a pattern is single; you can turn it into a looping pattern with quantifiers.

2.1.1 Quantifiers

FlinkCEP provides methods to specify looping behavior: oneOrMore() – expects one or more occurrences. times(n) – expects exactly n occurrences. times(min, max) – expects between min and max occurrences. timesOrMore(n) – expects at least n occurrences. optional() – makes the pattern optional. greedy() – repeats as many times as possible. consecutive() – enforces strict contiguity for looping patterns. allowCombinations() – enables nondeterministic relaxed contiguity.

Examples for a pattern named start:

// expecting 4 occurrences
start.times(4);
// optional 4 occurrences
start.times(4).optional();
// 2‑4 occurrences
start.times(2, 4);
// 2‑4 occurrences, greedy
start.times(2, 4).greedy();
// 0‑2‑4 optional
start.times(2, 4).optional();
// 1+ occurrences, greedy
start.oneOrMore().greedy();
// 0+ occurrences, optional
start.oneOrMore().optional();
// at least 2 occurrences
start.timesOrMore(2);
// at least 2 occurrences, greedy
start.timesOrMore(2).greedy();

2.1.2 Conditions

Conditions restrict which events can satisfy a pattern. They can be based on event properties, event continuity, or combinations thereof.

Iterative Conditions

Iterative conditions can access previously matched events via the Context object, allowing calculations such as cumulative sums.

middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
        if (!value.getName().startsWith("foo")) {
            return false;
        }
        double sum = value.getPrice();
        for (Event e : ctx.getEventsForPattern("middle")) {
            sum += e.getPrice();
        }
        return Double.compare(sum, 5.0) < 0;
    }
});

Scala equivalent:

middle.oneOrMore().where((value, ctx) => {
  lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
  value.getName.startsWith("foo") && sum + value.getPrice < 5.0
})

Simple Conditions

Simple conditions only examine the current event:

start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }
});

2.1.3 Subtype Constraint

You can restrict a pattern to a subclass of the base event type:

start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        // custom condition
        return true;
    }
});

2.1.4 API Overview

Key Pattern methods: where(condition) – adds an AND condition. or(condition) – adds an OR condition. until(condition) – defines a stop condition for looping patterns (only for oneOrMore()). subtype(class) – restricts the event type. oneOrMore(), times(n), times(min, max), timesOrMore(n) – define repetition. optional(), greedy(), consecutive(), allowCombinations() – modify looping behavior.

2.2 Composite Patterns

After defining single patterns, you can combine them using adjacency operators: next() – strict contiguity. followedBy() – relaxed contiguity. followedByAny() – nondeterministic relaxed contiguity. notNext() – strict negative pattern. notFollowedBy() – relaxed negative pattern (cannot end a sequence).

Example of strict, relaxed, and nondeterministic adjacency:

// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// nondeterministic relaxed contiguity
Pattern<Event, ?> nonDet = start.followedByAny("middle").where(...);

Time constraints can be added with within(Time.seconds(10)). Only one time constraint is allowed per pattern sequence; if multiple are defined, the smallest is applied.

2.3 After‑Match Skip Strategies

Four skip strategies control how many matches are emitted: NO_SKIP – emit every possible match. SKIP_PAST_LAST_EVENT – discard partial matches that contain any matched event. SKIP_TO_FIRST – skip to the first occurrence of a named pattern. SKIP_TO_LAST – skip to the last occurrence of a named pattern.

Specify a strategy when creating the pattern, e.g.,

Pattern.begin("name", AfterMatchSkipStrategy.skipPastLastEvent())

.

2.4 Detecting Patterns

Apply a pattern to a keyed or non‑keyed DataStream using CEP.pattern(input, pattern). For keyed streams, the pattern is evaluated per key; for non‑keyed streams the job runs with parallelism 1.

2.5 Selecting from Patterns

After a PatternStream is created, use select() or flatSelect() to extract results. PatternSelectFunction returns a single result per match, while PatternFlatSelectFunction can emit multiple results via a Collector.

2.6 Timeout Handling

If a pattern has a within() time window, partial matches that exceed the window are considered timed out. Provide a PatternTimeoutFunction (or PatternFlatTimeoutFunction) and an OutputTag to capture timed‑out matches.

2.7 Event‑Time Processing

CEP buffers incoming events sorted by timestamp and processes them when the watermark advances. Events with timestamps older than the last seen watermark are treated as late and are discarded.

2.8 Full Example

The following example detects a keyed stream of events matching the sequence start → middle(name="error") → end(name="critical") within 10 seconds, using event time:

StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...;
DataStream<Event> keyed = input.keyBy(event -> event.getId());

Pattern<Event, ?> pattern = Pattern.begin("start")
    .next("middle").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event e) { return e.getName().equals("error"); }
    })
    .followedBy("end").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event e) { return e.getName().equals("critical"); }
    })
    .within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(keyed, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
    @Override
    public Alert select(Map<String, List<Event>> pattern) throws Exception {
        return createAlert(pattern);
    }
});
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaCEPcomplex event processingBig DataFlinkstream processingScalaPattern API
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.