Implementing Custom Flink Sources and Sinks for RocketMQ and HBase Streaming
This article explains how to create custom Flink SourceFunction and SinkFunction implementations, demonstrates a RocketMQ source and an HBase sink with full code examples, and discusses checkpointing, event‑time handling, and deployment of the streaming job on a Flink‑on‑YARN cluster.
Introduction : Recently we attempted to convert existing Spark Streaming jobs into Flink Streaming jobs, and the first major challenge was implementing custom Source and Sink components.
SourceFunction : The root interface for defining a Flink source. It declares void run(SourceContext<T> ctx) throws Exception for continuously emitting data and void cancel() to stop the emission. The nested SourceContext provides collect(), collectWithTimestamp(), and emitWatermark() methods for data emission and watermark handling.
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
interface SourceContext<T> {
void collect(T element);
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
void emitWatermark(Watermark mark);
@PublicEvolving
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}RocketMQ Source : An example of a custom source that reads from RocketMQ. The class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> and implements CheckpointedFunction and CheckpointListener. In open() it validates properties, creates a pull‑mode consumer, and prepares offset tables. The run() method starts the pull service, registers a PullTaskCallback that pulls messages, deserializes them, emits them with timestamps via collectWithTimestamp(), and updates offsets under the checkpoint lock.
public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
// ... fields omitted for brevity ...
@Override
public void open(Configuration parameters) throws Exception {
LOG.debug("source open....");
Validate.notEmpty(props, "Consumer properties can not be empty");
// initialize topic, group, checkpoint flag, offset tables, etc.
}
@Override
public void run(SourceContext context) throws Exception {
LOG.debug("source run....");
final Object lock = context.getCheckpointLock();
// configure pull parameters
pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {
try {
long offset = getMessageQueueOffset(mq);
if (offset < 0) return;
PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);
boolean found = false;
switch (pullResult.getPullStatus()) {
case FOUND:
for (MessageExt msg : pullResult.getMsgFoundList()) {
byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
byte[] value = msg.getBody();
OUT data = schema.deserializeKeyAndValue(key, value);
synchronized (lock) {
context.collectWithTimestamp(data, msg.getBornTimestamp());
}
}
found = true;
break;
// other cases omitted
}
synchronized (lock) {
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
}
if (found) {
pullTaskContext.setPullNextDelayTimeMillis(0);
} else {
pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
pullConsumerScheduleService.start();
runningChecker.setRunning(true);
awaitTermination();
}
// snapshotState, initializeState, notifyCheckpointComplete, etc.
}SinkFunction : The base interface for custom sinks. It defines a single invoke() method that processes each incoming element. A context object provides access to processing time, watermark, and optional timestamp.
public interface SinkFunction<IN> extends Function, Serializable {
@Deprecated
default void invoke(IN value) throws Exception {}
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
@Public
interface Context<T> {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}
}Flink also provides a simple DiscardingSink that drops all received records:
@Public
public class DiscardingSink<T> implements SinkFunction<T> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(T value) {}
}HBase Sink : A concrete implementation that writes JSON records into HBase. It extends RichSinkFunction<JSONObject>, opens a shared HBase connection in open(), and in invoke() builds a Put using a composite row key (UID + date) and writes the data with the upload timestamp as the cell version.
public class CalendarHBaseSink extends RichSinkFunction<JSONObject> {
private static final long serialVersionUID = -6140896663267559061L;
private static final Logger LOGGER = LoggerFactory.getLogger(CalendarHBaseSink.class);
private static final byte[] CF_BYTES = Bytes.toBytes("f");
private static final TableName TABLE_NAME = TableName.valueOf("dw_hbase:calendar_record");
private Connection connection = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = HBaseConnection.get();
LOGGER.info("Opened CalendarHBaseSink");
}
@Override
public void close() throws Exception {
super.close();
HBaseConnection.close();
LOGGER.info("Closed CalendarHBaseSink");
}
@Override
public void invoke(JSONObject record, Context context) throws Exception {
Integer eventType = record.getInteger("eventtype");
String qualifier = EventType.getColumnQualifier(eventType);
if (qualifier == null) return;
Long uid = record.getLong("uid");
Integer recordDate = record.getInteger("dateline");
String data = record.getString("data");
Long uploadTime = record.getLong("updatetime");
try (Table table = connection.getTable(TABLE_NAME)) {
Put put = new Put(Bytes.toBytes(RowKeyUtil.getForCalendar(uid, recordDate)));
put.addColumn(CF_BYTES, Bytes.toBytes(qualifier), uploadTime * 1000, Bytes.toBytes(data));
table.put(put);
}
}
}Streaming Main Program : The job wires the RocketMQ source and HBase sink together, enables checkpointing (at‑least‑once), sets a memory state backend, and submits the pipeline to a Flink‑on‑YARN cluster.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(300000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new MemoryStateBackend(true));
Properties consumerProps = new Properties();
consumerProps.setProperty(NAME_SERVER_ADDR, RocketMQConst.NAME_SERVER_TEST);
consumerProps.setProperty(CONSUMER_OFFSET_RESET_TO, "latest");
consumerProps.setProperty(CONSUMER_TOPIC, "calendar");
consumerProps.setProperty(CONSUMER_TAG, "*");
consumerProps.setProperty(CONSUMER_GROUP, "FLINK_STREAM_CALENDAR_TEST_1");
env.addSource(new RocketMQSource<>(new JSONDeserializationSchema(), consumerProps))
.name("calendar-rocketmq-source")
.addSink(new CalendarHBaseSink())
.name("calendar-hbase-sink");
env.execute();
}Event‑Time & Watermark Discussion : The example uses processing time as the default time characteristic because Flink watermarks are operator‑wide. Using event time per key would require keyed streams with per‑key state or external storage (e.g., HBase cell timestamps) to avoid cross‑key interference.
Overall, the article provides a step‑by‑step guide, complete source code, and practical tips for building reliable Flink streaming pipelines that consume from RocketMQ and persist to HBase.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
