Inside Flink Metrics: Adding, Retrieving, and Exposing Metrics in TaskManager
This article walks through Flink's metric system by explaining the core interfaces such as MetricReporter and MetricRegistry, showing how metrics are added, registered, and queried during TaskManager startup, and detailing both REST and Prometheus approaches for retrieving metric values.
MetricReporter Interface
MetricReporter is the interface used to expose metric results. It defines the lifecycle methods
open,
close, and the notification methods
notifyOfAddedMetricand
notifyOfRemovedMetric.
<code>public interface MetricReporter {
void open(MetricConfig config);
void close();
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}
</code>MetricRegistry and Configuration
MetricRegistry bridges MetricReporter and MetricGroup. The implementation class
MetricRegistryImplis instantiated with
MetricRegistryConfigurationand a collection of
ReporterSetupobjects, starts the query service, and creates the configured reporters.
<code>public class MetricRegistryConfiguration {
private final ScopeFormats scopeFormats;
private final char delimiter;
public static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
scopeFormats = ScopeFormats.fromConfig(configuration);
char delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
long maxFrameSize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
long messageSizeLimitPadding = 256;
return new MetricRegistryConfiguration(scopeFormats, delim, maxFrameSize - messageSizeLimitPadding);
}
}
</code>ReporterSetup
ReporterSetup loads reporter configurations from the Flink configuration and creates reporter instances.
<code>public final class ReporterSetup {
public static List<ReporterSetup> fromConfiguration(Configuration configuration, @Nullable PluginManager pluginManager) {
String includedReporters = configuration.getString(MetricOptions.REPORTERS_LIST, "");
Set<String> namedReporters = findEnabledReportersInConfiguration(configuration, includedReporters);
List<Tuple2<String, Configuration>> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters);
Map<String, MetricReporterFactory> reporterFactories = loadAvailableReporterFactories(pluginManager);
return setupReporters(reporterFactories, reporterConfigurations);
}
}
</code>Adding Metrics in TaskManager
The TaskManager startup sequence begins in
org.apache.flink.runtime.taskexecutor.TaskManagerRunner. The
mainmethod calls
runTaskManagerProcessSecurely, which eventually invokes
runTaskManagerto create a
TaskManagerRunnerand start it.
<code>public static void main(String[] args) throws Exception {
runTaskManagerProcessSecurely(args);
}
public static int runTaskManager(Configuration configuration, PluginManager pluginManager) {
taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
taskManagerRunner.start();
return 0;
}
</code>The constructor of
TaskManagerRunnersets up executors, high‑availability services, RPC service, resource ID, heartbeat services, and creates the
MetricRegistryImplinstance.
<code>public TaskManagerRunner(Configuration configuration, PluginManager pluginManager, TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
this.executor = Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), new ExecutorThreadFactory("taskmanager-future"));
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
rpcService = createRpcService(configuration, highAvailabilityServices);
resourceId = getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());
metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration), ReporterSetup.fromConfiguration(configuration, pluginManager));
// start metric query service
RpcService metricQueryServiceRpc = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpc, resourceId);
// other initializations omitted for brevity
}
</code>When a metric is registered,
MetricRegistryImpl.registernotifies each reporter via
notifyOfAddedMetricand adds the metric to the
MetricQueryServicefor later retrieval.
<code>public void register(Metric metric, String metricName, AbstractMetricGroup group) {
synchronized (lock) {
for (ReporterAndSettings ras : reporters) {
FrontMetricGroup front = new FrontMetricGroup<>(ras.getSettings(), group);
ras.getReporter().notifyOfAddedMetric(metric, metricName, front);
}
queryService.addMetric(metricName, metric, group);
}
}
</code>TaskManagerRunner.start()
The
startmethod simply starts the task executor service, which eventually triggers the TaskExecutor lifecycle.
<code>public void start() throws Exception {
taskExecutorService.start();
}
</code>Retrieving Metrics via REST
The REST endpoint for metrics lives in the
flink-runtimemodule. The core class
org.apache.flink.runtime.webmonitor.WebMonitorEndpointconfigures the HTTP server and routes. The metric query service is started by
MetricRegistryImpl.startQueryService, after which clients can call the
queryMetricsRPC method to obtain serialized metric data.
<code>public void startQueryService(RpcService rpcService, ResourceID resourceID) {
synchronized (lock) {
Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
try {
metricQueryServiceRpcService = rpcService;
queryService = MetricQueryService.createMetricQueryService(rpcService, resourceID, maximumFramesize);
queryService.start();
} catch (Exception e) {
LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
}
}
}
</code>Prometheus PushGateway Reporter
For external collection, Flink provides a Prometheus reporter. The abstract class
AbstractPrometheusReporterimplements
MetricReporterand handles metric registration with a Prometheus
CollectorRegistry. The concrete
PrometheusPushGatewayReporterpushes metrics to a remote PushGateway and optionally deletes them on shutdown.
<code>public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
private final PushGateway pushGateway;
private final String jobName;
private final Map<String, String> groupingKey;
private final boolean deleteOnShutdown;
public PrometheusPushGatewayReporter(String host, int port, String jobName, Map<String, String> groupingKey, boolean deleteOnShutdown) {
this.pushGateway = new PushGateway(host + ':' + port);
this.jobName = Preconditions.checkNotNull(jobName);
this.groupingKey = Preconditions.checkNotNull(groupingKey);
this.deleteOnShutdown = deleteOnShutdown;
}
@Override
public void report() {
try {
pushGateway.push(CollectorRegistry.defaultRegistry, jobName, groupingKey);
} catch (Exception e) {
log.warn("Failed to push metrics to PushGateway with jobName {}, groupingKey {}.", jobName, groupingKey, e);
}
}
@Override
public void close() {
if (deleteOnShutdown && pushGateway != null) {
try {
pushGateway.delete(jobName, groupingKey);
} catch (IOException e) {
log.warn("Failed to delete metrics from PushGateway with jobName {}, groupingKey {}.", jobName, groupingKey, e);
}
}
super.close();
}
}
</code>In summary, the article details how Flink creates, registers, and exposes metrics through its internal registry, how the TaskManager lifecycle integrates metric services, and how users can retrieve metrics via REST APIs or push them to Prometheus.
37 Mobile Game Tech Team
37 Mobile Game Tech Team
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.