Implementation and Usage of Flink FileSystem, JDBC, and Kafka Connectors
The article provides a comprehensive technical guide on Flink's FileSystem, JDBC, and Kafka connectors, detailing their source and sink implementations, core code logic, checkpoint handling, partition commit strategies, and complete SQL usage examples for streaming applications.
This article explains the source and sink implementations of Flink's three major connectors—FileSystem, JDBC, and Kafka—offering detailed code analysis and practical usage examples for streaming data pipelines.
FileSystem Connector
Sink
The FileSystemTableSink is created by constructing a FileSystemTableSink object with configuration parameters, then converting a DataStream into a DataStreamSink via consumeDataStream. The method builds a RowDataPartitionComputer, an empty metastore, a UUID prefix, and a FileSystemFactory, then follows different branches for bounded and unbounded streams.
public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
Configuration conf = new Configuration();
context.getTable().getOptions().forEach(conf::setString);
return new FileSystemTableSink(
context.getObjectIdentifier(),
context.isBounded(),
context.getTable().getSchema(),
getPath(conf),
context.getTable().getPartitionKeys(),
conf.get(PARTITION_DEFAULT_NAME),
context.getTable().getOptions());
}The consumeDataStream method creates the partition computer, metastore, output file config, and file system factory, then either builds a FileSystemOutputFormat for bounded streams or a StreamingFileSink (row or bulk format) for unbounded streams.
public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
RowDataPartitionComputer computer = new RowDataPartitionComputer(
defaultPartName,
schema.getFieldNames(),
schema.getFieldDataTypes(),
partitionKeys.toArray(new String[0]));
// ... omitted for brevity ...
if (isBounded) {
// build FileSystemOutputFormat and write
} else {
// build StreamingFileSink with bucket assigner, rolling policy, etc.
}
}For streaming jobs, the else‑branch is used: a writer is created based on the output format (e.g., Parquet), a TableBucketAssigner wraps the partition computer, and a TableRollingPolicy determines file rotation. The BucketsBuilder creates Buckets that are wrapped into a StreamingFileWriter.
createStreamingSink
This method wraps the BucketsBuilder into a StreamingFileWriter, adds a StreamingFileCommitter when partition‑commit policies are configured, and finally adds a DiscardingSink because the data has already been processed by the preceding operators.
public static DataStreamSink<RowData> createStreamingSink(
Configuration conf,
Path path,
List<String> partitionKeys,
ObjectIdentifier tableIdentifier,
boolean overwrite,
DataStream<RowData> inputStream,
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
TableMetaStoreFactory msFactory,
FileSystemFactory fsFactory,
long rollingCheckInterval) {
// ... omitted for brevity ...
StreamingFileWriter fileWriter = new StreamingFileWriter(rollingCheckInterval, bucketsBuilder);
DataStream<CommitMessage> writerStream = inputStream.transform(
StreamingFileWriter.class.getSimpleName(),
TypeExtractor.createTypeInfo(CommitMessage.class),
fileWriter).setParallelism(inputStream.getParallelism());
// optional commit operator
return returnStream.addSink(new DiscardingSink()).setParallelism(1);
}Checkpoint handling is performed in snapshotState of the bucket, which serializes active bucket states, and in notifyCheckpointComplete where committed files are renamed and made visible to downstream consumers.
Partition Commit
Two strategies are supported: process‑time and partition‑time. Process‑time commits trigger after a configurable delay based on the system clock, while partition‑time commits wait until the watermark exceeds the partition timestamp plus the delay.
@Override
public void addPartition(String partition) {
if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
this.pendingPartitions.putIfAbsent(partition, procTimeService.getCurrentProcessingTime());
}
}
@Override
public List<String> committablePartitions(long checkpointId) {
List<String> needCommit = new ArrayList<>();
long currentProcTime = procTimeService.getCurrentProcessingTime();
// iterate and decide based on delay
return needCommit;
}Source
The FileSystemTableSource creates a InputFormatSourceFunction that reads files using the configured input format and emits RowData downstream.
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
Configuration conf = new Configuration();
context.getTable().getOptions().forEach(conf::setString);
return new FileSystemTableSource(
context.getTable().getSchema(),
getPath(conf),
context.getTable().getPartitionKeys(),
conf.get(PARTITION_DEFAULT_NAME),
context.getTable().getProperties());
}Example SQL demonstrates reading from a Kafka table, writing to a partitioned filesystem table, and querying with partition pruning.
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file'
);
INSERT INTO fs_table SELECT user_id, order_amount,
DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
DATE_FORMAT(log_ts, 'HH') FROM kafka_table;JDBC Connector
Source
The factory creates a JdbcDynamicTableSource. Two use‑cases exist: a batch scan source for full table reads and a lookup source for dimension table joins. The scan uses JdbcRowDataInputFormat, supporting column pruning, limit push‑down, and parallel reads via numeric partitioning.
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
.setDrivername(options.getDriverName())
.setDBUrl(options.getDbURL())
.setUsername(options.getUsername().orElse(null))
.setPassword(options.getPassword().orElse(null))
.setAutoCommit(readOptions.getAutoCommit());
// build query, add partition predicates, limit, etc.
return InputFormatProvider.of(builder.build());
}The lookup path uses JdbcRowDataLookupFunction, which caches results with Guava and retries on failures.
public void eval(Object... keys) {
RowData keyRow = GenericRowData.of(keys);
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (RowData cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
// retry logic and query execution
}Sink
The sink supports append‑only and upsert modes, determined by the presence of a primary key in the DDL. It batches writes via JdbcBatchingOutputFormat and, for upserts, checks existence before deciding between INSERT and UPDATE.
@Override
public final synchronized void writeRecord(In record) throws IOException {
checkFlushException();
addToBatch(record, jdbcRecordExtractor.apply(record));
batchCount++;
if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) {
flush();
}
}Example DDL creates a MySQL table, inserts data, queries it, and performs a temporal join.
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
INSERT INTO MyUserTable SELECT id, name, age, status FROM T;
SELECT id, name, age, status FROM MyUserTable;
SELECT * FROM myTopic LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key = MyUserTable.id;Kafka Connector
Source
The KafkaDynamicTableFactory builds a KafkaDynamicSource. It extracts schema and format information, creates deserialization schemas for key/value, and constructs a FlinkKafkaConsumer wrapped in a SourceFunctionProvider.
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final DeserializationSchema<RowData> keyDeserialization = createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
final DeserializationSchema<RowData> valueDeserialization = createDeserialization(context, valueDecodingFormat, valueProjection, null);
final FlinkKafkaConsumer<RowData> kafkaConsumer = createKafkaConsumer(keyDeserialization, valueDeserialization, context.createTypeInformation(producedDataType));
return SourceFunctionProvider.of(kafkaConsumer, false);
}The consumer handles dynamic partition discovery, checkpointed offset state, and exactly‑once semantics via two‑phase commit.
Sink
The KafkaDynamicSink creates a FlinkKafkaProducer wrapped in a SinkFunctionProvider. The producer extends TwoPhaseCommitSinkFunction to use Kafka transactions, guaranteeing exactly‑once delivery.
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final SerializationSchema<RowData> keySerialization = createSerialization(context, keyEncodingFormat, keyProjection, keyPrefix);
final SerializationSchema<RowData> valueSerialization = createSerialization(context, valueEncodingFormat, valueProjection, null);
final FlinkKafkaProducer<RowData> kafkaProducer = createKafkaProducer(keySerialization, valueSerialization);
return SinkFunctionProvider.of(kafkaProducer, parallelism);
}Key lifecycle methods include beginTransaction, preCommit, commit, snapshotState, and notifyCheckpointComplete, which together manage transaction boundaries across checkpoints.
Example DDL registers a Kafka table and shows how to read from it.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);Overall, the article provides a deep dive into the internal mechanics of Flink connectors, covering object construction, runtime behavior, checkpoint integration, and practical SQL usage.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
