Big Data 20 min read

Implementing Custom Source and Sink in Flink Streaming with RocketMQ and HBase

This article details how to migrate Spark Streaming jobs to Flink Streaming by creating custom SourceFunction and SinkFunction implementations, including a RocketMQ source connector and an HBase sink, with code examples, configuration tips, and discussion of checkpointing and watermark handling.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Implementing Custom Source and Sink in Flink Streaming with RocketMQ and HBase

We recently attempted to convert existing Spark Streaming jobs into Flink Streaming jobs, and the first major challenge was implementing custom Source and Sink components. The following sections record the solution.

SourceFunction

SourceFunction is the root interface for defining a Flink source. Its definition is shown below.

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;

    void cancel();

    interface SourceContext<T> {
        void collect(T element);

        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        void emitWatermark(Watermark mark);

        @PublicEvolving
        void markAsTemporarilyIdle();

        Object getCheckpointLock();

        void close();
    }
}

The run() method continuously emits data, usually implemented with a loop controlled by a flag; cancel() stops that loop. The nested SourceContext provides methods to emit elements, timestamps, and watermarks.

collect() : emits an element without a custom timestamp.

collectWithTimestamp() : emits an element with a user‑provided timestamp (required for event‑time processing).

emitWatermark() : emits a watermark, which is only meaningful for event‑time streams.

Additional sub‑interfaces include ParallelSourceFunction, RichSourceFunction, and RichParallelSourceFunction, which add parallelism support and access to runtime context.

RocketMQ Source

For legacy business that uses RocketMQ, we discovered an existing connector in the rocketmq-externals project, avoiding the need to implement a source from scratch. The connector’s main class is shown below.

public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
    implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);

    private transient MQPullConsumerScheduleService pullConsumerScheduleService;
    private DefaultMQPullConsumer consumer;
    private KeyValueDeserializationSchema<OUT> schema;

    private RunningChecker runningChecker;

    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
    private Map<MessageQueue, Long> offsetTable;
    private Map<MessageQueue, Long> restoredOffsets;
    private LinkedMap pendingOffsetsToCommit;

    private Properties props;
    private String topic;
    private String group;

    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";

    private transient volatile boolean restored;
    private transient boolean enableCheckpoint;

    public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
        this.schema = schema;
        this.props = props;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        LOG.debug("source open....");
        Validate.notEmpty(props, "Consumer properties can not be empty");
        Validate.notNull(schema, "KeyValueDeserializationSchema can not be null");

        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);

        Validate.notEmpty(topic, "Consumer topic can not be empty");
        Validate.notEmpty(group, "Consumer group can not be empty");

        this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();

        if (offsetTable == null) {
            offsetTable = new ConcurrentHashMap<>();
        }
        if (restoredOffsets == null) {
            restoredOffsets = new ConcurrentHashMap<>();
        }
        if (pendingOffsetsToCommit == null) {
            pendingOffsetsToCommit = new LinkedMap();
        }

        runningChecker = new RunningChecker();
        pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
        consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();

        consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
        RocketMQConfig.buildConsumerConfigs(props, consumer);
    }

    @Override
    public void run(SourceContext context) throws Exception {
        LOG.debug("source run....");
        final Object lock = context.getCheckpointLock();
        int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
            RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
        int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
            RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
        int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE,
            RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);

        pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
        pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {
                try {
                    long offset = getMessageQueueOffset(mq);
                    if (offset < 0) {
                        return;
                    }
                    PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);
                    boolean found = false;
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> messages = pullResult.getMsgFoundList();
                            for (MessageExt msg : messages) {
                                byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
                                byte[] value = msg.getBody();
                                OUT data = schema.deserializeKeyAndValue(key, value);
                                synchronized (lock) {
                                    context.collectWithTimestamp(data, msg.getBornTimestamp());
                                }
                            }
                            found = true;
                            break;
                        case NO_MATCHED_MSG:
                            LOG.debug("No matched message after offset {} for queue {}", offset, mq);
                            break;
                        case NO_NEW_MSG:
                            break;
                        case OFFSET_ILLEGAL:
                            LOG.warn("Offset {} is illegal for queue {}", offset, mq);
                            break;
                        default:
                            break;
                    }
                    synchronized (lock) {
                        putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    }
                    if (found) {
                        pullTaskContext.setPullNextDelayTimeMillis(0);
                    } else {
                        pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
        try {
            pullConsumerScheduleService.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }
        runningChecker.setRunning(true);
        awaitTermination();
    }

    private void awaitTermination() throws InterruptedException {
        while (runningChecker.isRunning()) {
            Thread.sleep(50);
        }
    }

    private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
        Long offset = offsetTable.get(mq);
        if (restored && offset == null) {
            offset = restoredOffsets.get(mq);
        }
        if (offset == null) {
            offset = consumer.fetchConsumeOffset(mq, false);
            if (offset < 0) {
                String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
                switch (initialOffset) {
                    case CONSUMER_OFFSET_EARLIEST:
                        offset = consumer.minOffset(mq);
                        break;
                    case CONSUMER_OFFSET_LATEST:
                        offset = consumer.maxOffset(mq);
                        break;
                    case CONSUMER_OFFSET_TIMESTAMP:
                        offset = consumer.searchOffset(mq, getLong(props,
                            RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
                        break;
                    default:
                        throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
                }
            }
        }
        offsetTable.put(mq, offset);
        return offsetTable.get(mq);
    }

    private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
        offsetTable.put(mq, offset);
        if (!enableCheckpoint) {
            consumer.updateConsumeOffset(mq, offset);
        }
    }

    @Override
    public void cancel() {
        LOG.debug("cancel ...");
        runningChecker.setRunning(false);
        if (pullConsumerScheduleService != null) {
            pullConsumerScheduleService.shutdown();
        }
        offsetTable.clear();
        restoredOffsets.clear();
        pendingOffsetsToCommit.clear();
    }

    @Override
    public void close() throws Exception {
        LOG.debug("close ...");
        try {
            cancel();
        } finally {
            super.close();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!runningChecker.isRunning()) {
            LOG.debug("snapshotState() called on closed source; returning null.");
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
        }
        unionOffsetStates.clear();
        HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
        Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
        offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));
        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
            currentOffsets.put(entry.getKey(), entry.getValue());
        }
        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
                offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        LOG.debug("initialize State ...");
        this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(
            new ListStateDescriptor<>(OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {})));
        this.restored = context.isRestored();
        if (restored) {
            if (restoredOffsets == null) {
                restoredOffsets = new ConcurrentHashMap<>();
            }
            for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
                if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
                }
            }
            LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
        } else {
            LOG.info("No restore state for the consumer.");
        }
    }

    @Override
    public TypeInformation<OUT> getProducedType() {
        return schema.getProducedType();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!runningChecker.isRunning()) {
            LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
            return;
        }
        final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
        if (posInMap == -1) {
            LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
            return;
        }
        Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
        for (int i = 0; i < posInMap; i++) {
            pendingOffsetsToCommit.remove(0);
        }
        if (offsets == null || offsets.size() == 0) {
            LOG.debug("Checkpoint state was empty.");
            return;
        }
        for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
            consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
        }
    }
}

The open() method validates configuration and creates a pull‑mode RocketMQ consumer thread. The run() method starts the thread, continuously pulls messages, emits them with timestamps via collectWithTimestamp(), and updates offsets under the checkpoint lock.

SinkFunction

SinkFunction is the root interface for custom sinks. Its definition is straightforward:

public interface SinkFunction<IN> extends Function, Serializable {
    @Deprecated
    default void invoke(IN value) throws Exception {}

    default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }

    @Public
    interface Context<T> {
        long currentProcessingTime();
        long currentWatermark();
        Long timestamp();
    }
}

Flink provides a simple implementation called DiscardingSink that drops all incoming records.

@Public
public class DiscardingSink<T> implements SinkFunction<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void invoke(T value) {}
}

HBase Sink

Below is a minimal but functional HBase sink implementation. It opens a shared HBase connection in open(), writes records in invoke(), and closes the connection in close().

public class CalendarHBaseSink extends RichSinkFunction<JSONObject> {
    private static final long serialVersionUID = -6140896663267559061L;

    private static final Logger LOGGER = LoggerFactory.getLogger(CalendarHBaseSink.class);
    private static byte[] CF_BYTES = Bytes.toBytes("f");
    private static TableName TABLE_NAME = TableName.valueOf("dw_hbase:calendar_record");
    private Connection connection = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // HBase connection is wrapped as a singleton because creating it is expensive
        connection = HBaseConnection.get();
        LOGGER.info("Opened CalendarHBaseSink");
    }

    @Override
    public void close() throws Exception {
        super.close();
        HBaseConnection.close();
        LOGGER.info("Closed CalendarHBaseSink");
    }

    // Records are pre‑parsed into JSON
    @Override
    public void invoke(JSONObject record, Context context) throws Exception {
        Integer eventType = record.getInteger("eventtype");
        // Map event type to a short column qualifier
        String qualifier = EventType.getColumnQualifier(eventType);
        if (qualifier == null) {
            return;
        }
        Long uid = record.getLong("uid");
        Integer recordDate = record.getInteger("dateline");
        String data = record.getString("data");
        Long uploadTime = record.getLong("updatetime");

        // Use try‑with‑resources; creating a Table is lightweight
        // For higher write throughput, BufferedMutator could be used
        try (Table table = connection.getTable(TABLE_NAME)) {
            // Use UID and record date as the row key; design a good RowKey is important
            Put put = new Put(Bytes.toBytes(RowKeyUtil.getForCalendar(uid, recordDate)));
            // Store upload time as the HBase cell timestamp
            put.addColumn(CF_BYTES, Bytes.toBytes(qualifier), uploadTime * 1000, Bytes.toBytes(data));
            table.put(put);
        }
    }
}

During development we encountered two issues: a mistaken JSON field name caused a NullPointerException that was hard to detect, and checkpoint timeouts were caused by an incorrect hosts entry for the HBase cluster. Both were resolved by fixing the field name and updating the hosts file.

Streaming Main Program

The source and sink are wired together and submitted to a Flink‑on‑YARN cluster.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(300000);
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    // At‑least‑once checkpointing is sufficient
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
    // Simple state backend; RocketMQ already tracks offsets
    env.setStateBackend(new MemoryStateBackend(true));

    Properties consumerProps = new Properties();
    consumerProps.setProperty(NAME_SERVER_ADDR, RocketMQConst.NAME_SERVER_TEST);
    consumerProps.setProperty(CONSUMER_OFFSET_RESET_TO, "latest");
    consumerProps.setProperty(CONSUMER_TOPIC, "calendar");
    consumerProps.setProperty(CONSUMER_TAG, "*");
    consumerProps.setProperty(CONSUMER_GROUP, "FLINK_STREAM_CALENDAR_TEST_1");

    env.addSource(new RocketMQSource<>(new JSONDeserializationSchema(), consumerProps))
        .name("calendar-rocketmq-source")
        .addSink(new CalendarHBaseSink())
        .name("calendar-hbase-sink");

    env.execute();
}

The JSON deserialization schema used by the source is equally simple:

public class JSONDeserializationSchema implements KeyValueDeserializationSchema<JSONObject> {
    private static final long serialVersionUID = -649479674253613072L;

    @Override
    public JSONObject deserializeKeyAndValue(byte[] key, byte[] value) {
        return value != null ? JSON.parseObject(new String(value, StandardCharsets.UTF_8)) : null;
    }

    @Override
    public TypeInformation<JSONObject> getProducedType() {
        return TypeInformation.of(JSONObject.class);
    }
}

We kept the default processing‑time characteristic instead of event‑time because Flink watermarks are operator‑level, not per‑key. Using event‑time would cause cross‑key interference, making some records appear late. Two possible solutions are: (1) use a keyed stream with per‑key state to maintain separate watermarks, or (2) rely on HBase’s built‑in cell timestamps (setting version count to 1) to keep only the latest record.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkStreamingHBaseRocketMQbigdataSinkFunctionSourceFunction
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.