Inside Flink Metrics: Adding, Retrieving, and Exposing Metrics in TaskManager
This article walks through Flink's metric system by explaining the core interfaces such as MetricReporter and MetricRegistry, showing how metrics are added, registered, and queried during TaskManager startup, and detailing both REST and Prometheus approaches for retrieving metric values.
MetricReporter Interface
MetricReporter is the interface used to expose metric results. It defines the lifecycle methods open, close, and the notification methods notifyOfAddedMetric and notifyOfRemovedMetric.
public interface MetricReporter {
void open(MetricConfig config);
void close();
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}MetricRegistry and Configuration
MetricRegistry bridges MetricReporter and MetricGroup. The implementation class MetricRegistryImpl is instantiated with MetricRegistryConfiguration and a collection of ReporterSetup objects, starts the query service, and creates the configured reporters.
public class MetricRegistryConfiguration {
private final ScopeFormats scopeFormats;
private final char delimiter;
public static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
scopeFormats = ScopeFormats.fromConfig(configuration);
char delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
long maxFrameSize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
long messageSizeLimitPadding = 256;
return new MetricRegistryConfiguration(scopeFormats, delim, maxFrameSize - messageSizeLimitPadding);
}
}ReporterSetup
ReporterSetup loads reporter configurations from the Flink configuration and creates reporter instances.
public final class ReporterSetup {
public static List<ReporterSetup> fromConfiguration(Configuration configuration, @Nullable PluginManager pluginManager) {
String includedReporters = configuration.getString(MetricOptions.REPORTERS_LIST, "");
Set<String> namedReporters = findEnabledReportersInConfiguration(configuration, includedReporters);
List<Tuple2<String, Configuration>> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters);
Map<String, MetricReporterFactory> reporterFactories = loadAvailableReporterFactories(pluginManager);
return setupReporters(reporterFactories, reporterConfigurations);
}
}Adding Metrics in TaskManager
The TaskManager startup sequence begins in org.apache.flink.runtime.taskexecutor.TaskManagerRunner. The main method calls runTaskManagerProcessSecurely, which eventually invokes runTaskManager to create a TaskManagerRunner and start it.
public static void main(String[] args) throws Exception {
runTaskManagerProcessSecurely(args);
}
public static int runTaskManager(Configuration configuration, PluginManager pluginManager) {
taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
taskManagerRunner.start();
return 0;
}The constructor of TaskManagerRunner sets up executors, high‑availability services, RPC service, resource ID, heartbeat services, and creates the MetricRegistryImpl instance.
public TaskManagerRunner(Configuration configuration, PluginManager pluginManager, TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
this.executor = Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), new ExecutorThreadFactory("taskmanager-future"));
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
rpcService = createRpcService(configuration, highAvailabilityServices);
resourceId = getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());
metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration), ReporterSetup.fromConfiguration(configuration, pluginManager));
// start metric query service
RpcService metricQueryServiceRpc = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpc, resourceId);
// other initializations omitted for brevity
}When a metric is registered, MetricRegistryImpl.register notifies each reporter via notifyOfAddedMetric and adds the metric to the MetricQueryService for later retrieval.
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
synchronized (lock) {
for (ReporterAndSettings ras : reporters) {
FrontMetricGroup front = new FrontMetricGroup<>(ras.getSettings(), group);
ras.getReporter().notifyOfAddedMetric(metric, metricName, front);
}
queryService.addMetric(metricName, metric, group);
}
}TaskManagerRunner.start()
The start method simply starts the task executor service, which eventually triggers the TaskExecutor lifecycle.
public void start() throws Exception {
taskExecutorService.start();
}Retrieving Metrics via REST
The REST endpoint for metrics lives in the flink-runtime module. The core class org.apache.flink.runtime.webmonitor.WebMonitorEndpoint configures the HTTP server and routes. The metric query service is started by MetricRegistryImpl.startQueryService, after which clients can call the queryMetrics RPC method to obtain serialized metric data.
public void startQueryService(RpcService rpcService, ResourceID resourceID) {
synchronized (lock) {
Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
try {
metricQueryServiceRpcService = rpcService;
queryService = MetricQueryService.createMetricQueryService(rpcService, resourceID, maximumFramesize);
queryService.start();
} catch (Exception e) {
LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
}
}
}Prometheus PushGateway Reporter
For external collection, Flink provides a Prometheus reporter. The abstract class AbstractPrometheusReporter implements MetricReporter and handles metric registration with a Prometheus CollectorRegistry. The concrete PrometheusPushGatewayReporter pushes metrics to a remote PushGateway and optionally deletes them on shutdown.
public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
private final PushGateway pushGateway;
private final String jobName;
private final Map<String, String> groupingKey;
private final boolean deleteOnShutdown;
public PrometheusPushGatewayReporter(String host, int port, String jobName, Map<String, String> groupingKey, boolean deleteOnShutdown) {
this.pushGateway = new PushGateway(host + ':' + port);
this.jobName = Preconditions.checkNotNull(jobName);
this.groupingKey = Preconditions.checkNotNull(groupingKey);
this.deleteOnShutdown = deleteOnShutdown;
}
@Override
public void report() {
try {
pushGateway.push(CollectorRegistry.defaultRegistry, jobName, groupingKey);
} catch (Exception e) {
log.warn("Failed to push metrics to PushGateway with jobName {}, groupingKey {}.", jobName, groupingKey, e);
}
}
@Override
public void close() {
if (deleteOnShutdown && pushGateway != null) {
try {
pushGateway.delete(jobName, groupingKey);
} catch (IOException e) {
log.warn("Failed to delete metrics from PushGateway with jobName {}, groupingKey {}.", jobName, groupingKey, e);
}
}
super.close();
}
}In summary, the article details how Flink creates, registers, and exposes metrics through its internal registry, how the TaskManager lifecycle integrates metric services, and how users can retrieve metrics via REST APIs or push them to Prometheus.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
