Practical Experience with State Management in Flink Real‑Time Stream Processing
This article shares practical experiences and insights on using different types of state in Apache Flink for real‑time stream processing, covering managed versus raw state, code examples in Scala and Java, handling late data, dimension table joins, distinct semantics, and best‑practice recommendations.
This article shares practical experiences and insights on using state in Apache Flink for real‑time stream processing, assuming readers already have a basic understanding of Flink.
1. Types of State
1.1 State categories from a data perspective
KeyedState : In a keyed DataStream each key has its own state, meaning the state can access all data belonging to that key. Consequently, KeyedState can only be used on a KeyedStream.
OperatorState : OperatorState holds data received by a specific operator instance; there is one OperatorState per parallel operator instance.
1.2 State categories from Flink runtime perspective
The runtime supports two kinds of state handling:
Managed State : The Flink runtime knows the internal data structure, can optimize storage, checkpointing, parallelism changes and memory management. All DataStream functions (map, filter, apply, etc.) support managed state. It is the recommended approach.
Raw State : Users define the internal data structure, offering higher flexibility, but the runtime cannot optimize it because it does not know the structure.
Managed State
Raw State
KeyedState
ValueState<T>
ListState<T>
MapState
ReducingState<T>
AggregatingState
OperatorState
CheckpointedFunction
ListCheckpointed<T extends Serializable>
1.3 Example – Ranking cumulative sales per store
1.3.1 Scala version
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
object StateExample {
case class Order(finishTime: Long, memberId: Long, productId: Long, sale: Double)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Choose a state backend (MemoryStateBackend, FsStateBackend, RocksDBStateBackend)
env.setStateBackend(new RocksDBStateBackend("oss://bigdata/xxx/order-state"))
val dataStream = env.fromCollection((1 to 25).map(i => Order(i, i % 7, i % 3, i + 0.1)))
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order](Time.milliseconds(1)) {
override def extractTimestamp(element: Order): Long = element.finishTime
})
// Real‑time ranking per store
dataStream.keyBy("memberId")
.mapWithState[List[Order], List[Order]] {
case (order: Order, None) => (order +: Nil, Some(List(order)))
case (order: Order, Some(orders)) => {
val l = (orders :+ order)
.groupBy(_.productId)
.mapValues {
case List(o) => o
case lst: List[Order] => lst.reduce((a, b) => Order(
if (a.finishTime > b.finishTime) a.finishTime else b.finishTime,
a.memberId, a.productId, a.sale + b.sale))
}
.values
.toList
.sortWith(_.sale > _.sale)
(l, Some(l))
}
}
.print()
env.execute("example")
}
}1.3.2 Java version
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class StateExampleJ {
static final SimpleDateFormat YYYY_MM_DD_HH = new SimpleDateFormat("yyyyMMdd HH");
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
List<Order> data = new LinkedList<>();
for (long i = 1; i <= 25; i++) {
data.add(new Order(i, i % 7, i % 3, i + 0.1));
}
DataStream<Order> dataStream = env.fromCollection(data).setParallelism(1)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.milliseconds(1)) {
@Override
public long extractTimestamp(Order element) { return element.finishTime; }
});
dataStream.keyBy(o -> o.memberId).map(new RichMapFunction<Order, List<Order>>() {
MapState<Long, Order> mapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor<Long, Order> productRank = new MapStateDescriptor<>("productRank", Long.class, Order.class);
mapState = getRuntimeContext().getMapState(productRank);
}
@Override
public List<Order> map(Order value) throws Exception {
if (mapState.contains(value.productId)) {
Order acc = mapState.get(value.productId);
value.sale += acc.sale;
}
mapState.put(value.productId, value);
return IteratorUtils.toList(mapState.values().iterator());
}
}).print();
env.execute("exsample");
}
public static class Order {
public long finishTime;
public long memberId;
public long productId;
public double sale;
public Order() {}
public Order(Long finishTime, Long memberId, Long productId, Double sale) {
this.finishTime = finishTime;
this.memberId = memberId;
this.productId = productId;
this.sale = sale;
}
}
}2. Optimizing State for Late Data
Late events (out‑of‑order data) are a common challenge. Flink provides two mechanisms:
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.milliseconds(1)) { ... }) .timeWindow(Time.days(1)).allowedLateness(Time.seconds(1)).sideOutputLateData(outputTag)Choosing a suitable lateness bound is a trade‑off between accuracy and performance. The article proposes a custom solution (code omitted) that dynamically adjusts lateness based on observed data.
3. Dimension‑Table Joins Based on State
Flink offers several ways to join a dimension table:
AsyncDataStream.unorderedWait()
Join
BroadcastStream
The author demonstrates caching the dimension table in state with a TTL to keep it refreshed, using the following Java implementation:
import com.fulu.stream.source.http.SyncHttpClient;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.configuration.Configuration;
public class MainOrderHttpMap extends RichMapFunction<SimpleOrder, SimpleOrder> {
transient MapState<String, Member> member;
transient SyncHttpClient client;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig updateTtl = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
.build();
MapStateDescriptor<String, Member> memberDesc = new MapStateDescriptor<>("member-map", String.class, Member.class);
memberDesc.enableTimeToLive(updateTtl);
member = getRuntimeContext().getMapState(memberDesc);
}
@Override
public SimpleOrder map(SimpleOrder value) throws Exception {
value.profitCenterName = getProfitCenter(value.memberId);
return value;
}
private String getProfitCenter(String id) throws Exception {
String name = null;
int retry = 1;
while (name == null && retry <= 3) {
if (member.contains(id)) {
name = member.get(id).profitCenterName;
} else {
Member m = client.queryMember(id);
if (m != null) {
member.put(id, m);
name = m.profitCenterName;
}
}
retry++;
}
return name;
}
@Override
public void close() throws Exception {
super.close();
client.close();
}
}This approach caches the dimension data in state and expires entries after a configurable period, ensuring periodic refreshes.
4. Distinct Semantics
DataStream does not provide a built‑in distinct operation. When the source may contain duplicate events, the author suggests using state to cache event IDs and a filter to drop duplicates. The implementation is omitted for brevity.
5. Conclusion
The author, originally experienced with Spark Streaming, found Flink's state handling the steepest learning curve and shares the above practices to help others master stateful stream processing in Flink.
References:
Flink official documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html
https://www.jianshu.com/p/ac0fff780d40?from=singlemessage
https://zhuanlan.zhihu.com/p/136722111
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Fulu Network R&D Team
Providing technical literature sharing for Fulu Holdings' tech elite, promoting its technologies through experience summaries, technology consolidation, and innovation sharing.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
