Big Data 12 min read

Understanding Flink End-to-End Latency Measurement with LatencyMarker

This article explains the background, source‑code analysis, implementation details, metric granularity, and practical considerations of Flink's LatencyMarker feature for measuring full‑link job latency in streaming applications.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Flink End-to-End Latency Measurement with LatencyMarker

1. Background

Flink job end-to-end latency is an important metric for measuring overall performance and response time of streaming applications, especially those requiring low latency.

Most streaming engines provide full‑link latency metrics on their monitoring pages, typically displayed as histograms.

Low‑latency scenarios such as login, order rule checking, and real‑time prediction need a measurable metric to monitor cluster latency.

2. Source Code Analysis Origin

The analysis is based on Flink community issue FLINK-3660 and the corresponding PR (pull-2386), with additional explanations added by the author.

The PR only covers part of the full‑link latency implementation, so the article summarizes:

Source‑to‑Sink LatencyMarker source code

LatencyMarksEmitter class that emits latency markers

LatencyStats (histogram metric) source code

3. Tencent Oceanus Monitoring Metric Reference

Images show the latency metric highlighted in red.

4. Flink LatencyMarker Implementation Idea

Evolution of the solution

Initially, Flink attached an ingestion‑time timestamp to each record at the source, which added 8 bytes per element and increased overhead for users not using the monitoring feature.

Later the community decided to emit special events (LatencyMarker) periodically, similar to watermarks.

Implementation principle

LatencyMarker events are sent from the source at a configurable interval and forwarded by each task. The sink compares the marker’s timestamp with the current system time to compute latency.

LatencyMarker does not add extra job latency, but it can be delayed by back‑pressure like regular records.

Clock drift between nodes

The current design assumes synchronized clocks on all TaskManagers; otherwise measured latency includes clock drift.

A possible improvement is to use the JobManager as a central timing service, with TaskManagers periodically querying the JM time.

5. Flink LatencyMarker Source Code

Implementation details from PR pull-2386.

Base class and marker emission

Flink introduces a new StreamElement called LatencyMarker, emitted at a configurable interval (default 0 ms, can be set e.g., 2000 ms via ExecutionConfig#latencyTrackingInterval).

Operators that are not sinks forward the marker; sinks keep the last 128 markers per source.

Multi‑output channel – random emission

Operators with multiple output channels randomly select one channel to send the marker, ensuring a single instance in the system.

public class RecordWriterOutput{
    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        serializationDelegate.setInstance(latencyMarker);
        try{
            // randomly selects a channel internally
            recordWriter.randomEmit(serializationDelegate);
        }catch (Exception e){
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}
The above emitLatencyMarker() is called by StreamSource and AbstractStreamOperator to dispatch latency markers.

If the operator is a sink, it maintains the last 128 LatencyMarker information for each known source.

Metric display

Each known source’s min/max/avg/p50/p95/p99 latency is aggregated in the sink’s LatencyStats object.

The PR only implements full‑link latency statistics; Flink already provides a complete metric display framework.

Without synchronized system clocks, hardware clock errors will affect measurement accuracy.

6. Latency Granularity Explanation

Concept of granularity

Operators or sinks can configure metrics.latency.granularity to adjust the granularity of statistics (Source, Operator, Subtask).

A) The statistic can be keyed by source ID and source subtask index.

B) The current operator and its subtask index always participate in the metric name.

Three tracking strategies and their source definitions

Single – tracks latency without distinguishing source or subtask:

SINGLE{
    String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex){
        // only operatorId and subtask index are used
        return String.valueOf(operatorId) + operatorSubtaskIndex;
    }
}

Operator – distinguishes source but not source subtask:

OPERATOR{
    String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex){
        // source ID participates
        return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;
    }
}

Subtask – distinguishes both source and source subtask:

SUBTASK{
    String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex){
        return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
    }
}

These names are used as keys to store histograms in a map:

Map<String, DescriptiveStatisticsHistogram> latencyStats = new HashMap<>();
// pseudo‑code to create histogram
latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
this.latencyStats.put(uniqueName, latencyHistogram);
// pseudo‑code to update histogram
long now = System.currentTimeMillis();
latencyHistogram.update(now - marker.getMarkedTime());

Reflection in Web Metric UI

The metric path varies with the chosen granularity, e.g., for Subtask:

Job_<source_id><source_subtask_index><operator_id>_<operator_subtask_index>.latency

7. Summary

LatencyMarker is not part of window or MiniBatch timing; it is forwarded directly by intermediate operators.

Metric path: TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency (may vary with granularity).

Each intermediate operator and sink records its latency to the source, and the UI typically shows source‑to‑sink latency.

Granular latency per task helps identify machines with high latency for troubleshooting.

Configuring a larger emission interval (e.g., 20 seconds) generally does not affect business processing performance.

Reference: https://blog.csdn.net/LS_ice/article/details/103295774
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkLatencyMarker
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.