Big Data 16 min read

Inside Flink Metrics: Adding, Retrieving, and Exposing Metrics in TaskManager

This article walks through Flink's metric system by explaining the core interfaces such as MetricReporter and MetricRegistry, showing how metrics are added, registered, and queried during TaskManager startup, and detailing both REST and Prometheus approaches for retrieving metric values.

37 Mobile Game Tech Team
37 Mobile Game Tech Team
37 Mobile Game Tech Team
Inside Flink Metrics: Adding, Retrieving, and Exposing Metrics in TaskManager

MetricReporter Interface

MetricReporter is the interface used to expose metric results. It defines the lifecycle methods open, close, and the notification methods notifyOfAddedMetric and notifyOfRemovedMetric.

public interface MetricReporter {
    void open(MetricConfig config);
    void close();
    void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}

MetricRegistry and Configuration

MetricRegistry bridges MetricReporter and MetricGroup. The implementation class MetricRegistryImpl is instantiated with MetricRegistryConfiguration and a collection of ReporterSetup objects, starts the query service, and creates the configured reporters.

public class MetricRegistryConfiguration {
    private final ScopeFormats scopeFormats;
    private final char delimiter;
    public static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
        scopeFormats = ScopeFormats.fromConfig(configuration);
        char delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
        long maxFrameSize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
        long messageSizeLimitPadding = 256;
        return new MetricRegistryConfiguration(scopeFormats, delim, maxFrameSize - messageSizeLimitPadding);
    }
}

ReporterSetup

ReporterSetup loads reporter configurations from the Flink configuration and creates reporter instances.

public final class ReporterSetup {
    public static List<ReporterSetup> fromConfiguration(Configuration configuration, @Nullable PluginManager pluginManager) {
        String includedReporters = configuration.getString(MetricOptions.REPORTERS_LIST, "");
        Set<String> namedReporters = findEnabledReportersInConfiguration(configuration, includedReporters);
        List<Tuple2<String, Configuration>> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters);
        Map<String, MetricReporterFactory> reporterFactories = loadAvailableReporterFactories(pluginManager);
        return setupReporters(reporterFactories, reporterConfigurations);
    }
}

Adding Metrics in TaskManager

The TaskManager startup sequence begins in org.apache.flink.runtime.taskexecutor.TaskManagerRunner. The main method calls runTaskManagerProcessSecurely, which eventually invokes runTaskManager to create a TaskManagerRunner and start it.

public static void main(String[] args) throws Exception {
    runTaskManagerProcessSecurely(args);
}

public static int runTaskManager(Configuration configuration, PluginManager pluginManager) {
    taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
    taskManagerRunner.start();
    return 0;
}

The constructor of TaskManagerRunner sets up executors, high‑availability services, RPC service, resource ID, heartbeat services, and creates the MetricRegistryImpl instance.

public TaskManagerRunner(Configuration configuration, PluginManager pluginManager, TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
    this.executor = Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), new ExecutorThreadFactory("taskmanager-future"));
    highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
    rpcService = createRpcService(configuration, highAvailabilityServices);
    resourceId = getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());
    metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration), ReporterSetup.fromConfiguration(configuration, pluginManager));
    // start metric query service
    RpcService metricQueryServiceRpc = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
    metricRegistry.startQueryService(metricQueryServiceRpc, resourceId);
    // other initializations omitted for brevity
}

When a metric is registered, MetricRegistryImpl.register notifies each reporter via notifyOfAddedMetric and adds the metric to the MetricQueryService for later retrieval.

public void register(Metric metric, String metricName, AbstractMetricGroup group) {
    synchronized (lock) {
        for (ReporterAndSettings ras : reporters) {
            FrontMetricGroup front = new FrontMetricGroup<>(ras.getSettings(), group);
            ras.getReporter().notifyOfAddedMetric(metric, metricName, front);
        }
        queryService.addMetric(metricName, metric, group);
    }
}

TaskManagerRunner.start()

The start method simply starts the task executor service, which eventually triggers the TaskExecutor lifecycle.

public void start() throws Exception {
    taskExecutorService.start();
}

Retrieving Metrics via REST

The REST endpoint for metrics lives in the flink-runtime module. The core class org.apache.flink.runtime.webmonitor.WebMonitorEndpoint configures the HTTP server and routes. The metric query service is started by MetricRegistryImpl.startQueryService, after which clients can call the queryMetrics RPC method to obtain serialized metric data.

public void startQueryService(RpcService rpcService, ResourceID resourceID) {
    synchronized (lock) {
        Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
        try {
            metricQueryServiceRpcService = rpcService;
            queryService = MetricQueryService.createMetricQueryService(rpcService, resourceID, maximumFramesize);
            queryService.start();
        } catch (Exception e) {
            LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
        }
    }
}

Prometheus PushGateway Reporter

For external collection, Flink provides a Prometheus reporter. The abstract class AbstractPrometheusReporter implements MetricReporter and handles metric registration with a Prometheus CollectorRegistry. The concrete PrometheusPushGatewayReporter pushes metrics to a remote PushGateway and optionally deletes them on shutdown.

public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    private final PushGateway pushGateway;
    private final String jobName;
    private final Map<String, String> groupingKey;
    private final boolean deleteOnShutdown;

    public PrometheusPushGatewayReporter(String host, int port, String jobName, Map<String, String> groupingKey, boolean deleteOnShutdown) {
        this.pushGateway = new PushGateway(host + ':' + port);
        this.jobName = Preconditions.checkNotNull(jobName);
        this.groupingKey = Preconditions.checkNotNull(groupingKey);
        this.deleteOnShutdown = deleteOnShutdown;
    }

    @Override
    public void report() {
        try {
            pushGateway.push(CollectorRegistry.defaultRegistry, jobName, groupingKey);
        } catch (Exception e) {
            log.warn("Failed to push metrics to PushGateway with jobName {}, groupingKey {}.", jobName, groupingKey, e);
        }
    }

    @Override
    public void close() {
        if (deleteOnShutdown && pushGateway != null) {
            try {
                pushGateway.delete(jobName, groupingKey);
            } catch (IOException e) {
                log.warn("Failed to delete metrics from PushGateway with jobName {}, groupingKey {}.", jobName, groupingKey, e);
            }
        }
        super.close();
    }
}

In summary, the article details how Flink creates, registers, and exposes metrics through its internal registry, how the TaskManager lifecycle integrates metric services, and how users can retrieve metrics via REST APIs or push them to Prometheus.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

javamonitoringBig DataFlinkMetricsPrometheus
37 Mobile Game Tech Team
Written by

37 Mobile Game Tech Team

37 Mobile Game Tech Team

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.