Big Data 16 min read

Inside Flink Metrics: Adding, Retrieving, and Exposing Metrics in TaskManager

This article walks through Flink's metric system by explaining the core interfaces such as MetricReporter and MetricRegistry, showing how metrics are added, registered, and queried during TaskManager startup, and detailing both REST and Prometheus approaches for retrieving metric values.

37 Mobile Game Tech Team
37 Mobile Game Tech Team
37 Mobile Game Tech Team
Inside Flink Metrics: Adding, Retrieving, and Exposing Metrics in TaskManager

MetricReporter Interface

MetricReporter is the interface used to expose metric results. It defines the lifecycle methods

open

,

close

, and the notification methods

notifyOfAddedMetric

and

notifyOfRemovedMetric

.

<code>public interface MetricReporter {
    void open(MetricConfig config);
    void close();
    void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}
</code>

MetricRegistry and Configuration

MetricRegistry bridges MetricReporter and MetricGroup. The implementation class

MetricRegistryImpl

is instantiated with

MetricRegistryConfiguration

and a collection of

ReporterSetup

objects, starts the query service, and creates the configured reporters.

<code>public class MetricRegistryConfiguration {
    private final ScopeFormats scopeFormats;
    private final char delimiter;
    public static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
        scopeFormats = ScopeFormats.fromConfig(configuration);
        char delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
        long maxFrameSize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
        long messageSizeLimitPadding = 256;
        return new MetricRegistryConfiguration(scopeFormats, delim, maxFrameSize - messageSizeLimitPadding);
    }
}
</code>

ReporterSetup

ReporterSetup loads reporter configurations from the Flink configuration and creates reporter instances.

<code>public final class ReporterSetup {
    public static List<ReporterSetup> fromConfiguration(Configuration configuration, @Nullable PluginManager pluginManager) {
        String includedReporters = configuration.getString(MetricOptions.REPORTERS_LIST, "");
        Set<String> namedReporters = findEnabledReportersInConfiguration(configuration, includedReporters);
        List<Tuple2<String, Configuration>> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters);
        Map<String, MetricReporterFactory> reporterFactories = loadAvailableReporterFactories(pluginManager);
        return setupReporters(reporterFactories, reporterConfigurations);
    }
}
</code>

Adding Metrics in TaskManager

The TaskManager startup sequence begins in

org.apache.flink.runtime.taskexecutor.TaskManagerRunner

. The

main

method calls

runTaskManagerProcessSecurely

, which eventually invokes

runTaskManager

to create a

TaskManagerRunner

and start it.

<code>public static void main(String[] args) throws Exception {
    runTaskManagerProcessSecurely(args);
}

public static int runTaskManager(Configuration configuration, PluginManager pluginManager) {
    taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
    taskManagerRunner.start();
    return 0;
}
</code>

The constructor of

TaskManagerRunner

sets up executors, high‑availability services, RPC service, resource ID, heartbeat services, and creates the

MetricRegistryImpl

instance.

<code>public TaskManagerRunner(Configuration configuration, PluginManager pluginManager, TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
    this.executor = Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), new ExecutorThreadFactory("taskmanager-future"));
    highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
    rpcService = createRpcService(configuration, highAvailabilityServices);
    resourceId = getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());
    metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration), ReporterSetup.fromConfiguration(configuration, pluginManager));
    // start metric query service
    RpcService metricQueryServiceRpc = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
    metricRegistry.startQueryService(metricQueryServiceRpc, resourceId);
    // other initializations omitted for brevity
}
</code>

When a metric is registered,

MetricRegistryImpl.register

notifies each reporter via

notifyOfAddedMetric

and adds the metric to the

MetricQueryService

for later retrieval.

<code>public void register(Metric metric, String metricName, AbstractMetricGroup group) {
    synchronized (lock) {
        for (ReporterAndSettings ras : reporters) {
            FrontMetricGroup front = new FrontMetricGroup<>(ras.getSettings(), group);
            ras.getReporter().notifyOfAddedMetric(metric, metricName, front);
        }
        queryService.addMetric(metricName, metric, group);
    }
}
</code>

TaskManagerRunner.start()

The

start

method simply starts the task executor service, which eventually triggers the TaskExecutor lifecycle.

<code>public void start() throws Exception {
    taskExecutorService.start();
}
</code>

Retrieving Metrics via REST

The REST endpoint for metrics lives in the

flink-runtime

module. The core class

org.apache.flink.runtime.webmonitor.WebMonitorEndpoint

configures the HTTP server and routes. The metric query service is started by

MetricRegistryImpl.startQueryService

, after which clients can call the

queryMetrics

RPC method to obtain serialized metric data.

<code>public void startQueryService(RpcService rpcService, ResourceID resourceID) {
    synchronized (lock) {
        Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
        try {
            metricQueryServiceRpcService = rpcService;
            queryService = MetricQueryService.createMetricQueryService(rpcService, resourceID, maximumFramesize);
            queryService.start();
        } catch (Exception e) {
            LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
        }
    }
}
</code>

Prometheus PushGateway Reporter

For external collection, Flink provides a Prometheus reporter. The abstract class

AbstractPrometheusReporter

implements

MetricReporter

and handles metric registration with a Prometheus

CollectorRegistry

. The concrete

PrometheusPushGatewayReporter

pushes metrics to a remote PushGateway and optionally deletes them on shutdown.

<code>public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    private final PushGateway pushGateway;
    private final String jobName;
    private final Map<String, String> groupingKey;
    private final boolean deleteOnShutdown;

    public PrometheusPushGatewayReporter(String host, int port, String jobName, Map<String, String> groupingKey, boolean deleteOnShutdown) {
        this.pushGateway = new PushGateway(host + ':' + port);
        this.jobName = Preconditions.checkNotNull(jobName);
        this.groupingKey = Preconditions.checkNotNull(groupingKey);
        this.deleteOnShutdown = deleteOnShutdown;
    }

    @Override
    public void report() {
        try {
            pushGateway.push(CollectorRegistry.defaultRegistry, jobName, groupingKey);
        } catch (Exception e) {
            log.warn("Failed to push metrics to PushGateway with jobName {}, groupingKey {}.", jobName, groupingKey, e);
        }
    }

    @Override
    public void close() {
        if (deleteOnShutdown && pushGateway != null) {
            try {
                pushGateway.delete(jobName, groupingKey);
            } catch (IOException e) {
                log.warn("Failed to delete metrics from PushGateway with jobName {}, groupingKey {}.", jobName, groupingKey, e);
            }
        }
        super.close();
    }
}
</code>

In summary, the article details how Flink creates, registers, and exposes metrics through its internal registry, how the TaskManager lifecycle integrates metric services, and how users can retrieve metrics via REST APIs or push them to Prometheus.

JavamonitoringBig DataFlinkMetricsPrometheus
37 Mobile Game Tech Team
Written by

37 Mobile Game Tech Team

37 Mobile Game Tech Team

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.