Understanding Flink StreamingFileSink: File States, Rolling Policies, and Example Code
This article explains Flink's StreamingFileSink in version 1.10.0, covering how files transition through In‑progress, Pending, and Finished states, the bucket assignment and rolling policies, and provides a complete Java example for writing string data to files.
Flink's streaming computation moves data from a Source through Operators to a Sink. This article focuses on the powerful StreamingFileSink introduced in Flink 1.10.0, replacing the deprecated BucketingSink , and explains its file‑state management, bucket policies, and rolling strategies.
1. File States
When writing files, Flink organizes them into buckets, and each file can be in one of three states:
In‑progress : the file is currently being written.
Pending : an In‑progress file that has been closed moves to this state.
Finished : after a successful checkpoint, a Pending file becomes Finished.
2. Simple String Write Example
DataStreamSource<String> lines = FlinkUtil.createSocketStream("localhost", 8888);
StreamExecutionEnvironment env = FlinkUtil.getEnv();
// 设置checkpoint
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
OutputFileConfig config = OutputFileConfig.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
/**
* 设置桶分配政策
* DateTimeBucketAssigner--默认的桶分配政策,基于时间的分配器,每小时产生一个桶,格式 yyyy‑MM‑dd‑HH
* BasePathBucketAssigner:将所有 part file 存储在基本路径中的单个全局桶
*/
.withBucketAssigner(new DateTimeBucketAssigner<>())
/**
* 有三种滚动政策
* CheckpointRollingPolicy
* DefaultRollingPolicy
* OnCheckpointRollingPolicy
*/
.withRollingPolicy(
/**
* 滚动策略决定了写出文件的状态变化过程
* 1. In‑progress:当前文件正在写入中
* 2. Pending:当 In‑progress 文件关闭后变为 Pending
* 3. Finished:在成功的 Checkpoint 后 Pending 变为 Finished
*
* 观察到的现象
* 1. 会根据本地时间和时区先创建桶目录
* 2. 文件名称规则:part‑<subtaskIndex>‑<partFileIndex>
* 3. macOS 默认不显示隐藏文件,需要显示隐藏文件才能看到 In‑progress 和 Pending 状态的文件(文件名前缀为 .)
*/
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) // 设置滚动间隔
.withInactivityInterval(TimeUnit.SECONDS.toMillis(1)) // 设置不活动时间间隔
.withMaxPartSize(1024 * 1024 * 1024) // 最大分片大小
.build())
.withOutputFileConfig(config)
.build();
lines.addSink(sink).setParallelism(1);3. File Rolling Policy Details
The rolling policy determines when a file transitions between states. A file rolls when it is not in the In‑progress state or when DefaultRollingPolicy.shouldRollOnEvent returns true because the file size exceeds the configured limit. Rolling closes the current part file, creates a new one, and increments the part counter.
StreamingFileSink extends RichSinkFunction and registers a timer that fires at currentProcessingTime + bucketCheckInterval. The default bucketCheckInterval is 60 000 ms (one minute). The onProcessingTime callback, invoked every 60 seconds, checks whether a part file should be closed (not rolled) based on inactivity or time‑based thresholds.
Methods inherited from CheckpointedFunction — snapshotState and initializeState —handle checkpoint snapshots. During a snapshot, buckets.snapshotState() may trigger a roll if DefaultRollingPolicy.shouldRollOnCheckpoint is satisfied (file size exceeds the limit), causing the part file to close.
The notifyCheckpointComplete method, from CheckpointListener, renames files that are in the Pending state after a checkpoint completes.
Enjoy the article? Like, collect, and share it!
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
