Cloud Native 21 min read

How Alibaba Cloud Built Real‑Time OpenAPI Monitoring with Flink + SLS

This article details the design and implementation of a cloud‑native, real‑time monitoring system for Alibaba Cloud OpenAPI, covering background challenges, a Flink‑SLS architecture, multi‑region data processing, checkpoint and state‑backend tuning, source‑side predicate pushdown, visualization with Grafana, and production results.

Alibaba Cloud Observability
Alibaba Cloud Observability
Alibaba Cloud Observability
How Alibaba Cloud Built Real‑Time OpenAPI Monitoring with Flink + SLS

Background and Challenges

Alibaba Cloud OpenAPI serves as the standard entry for managing cloud resources, handling billions of API calls across dozens of products. High availability and low latency are critical because any API fluctuation can affect customer production workloads. The monitoring system must support three stakeholder groups: the OpenAPI operations team (global gateway health), individual product teams (e.g., ECS, RDS) needing fine‑grained metrics, and SRE teams requiring rapid root‑cause analysis.

Core Challenges

Massive, uneven traffic distribution leading to data skew in a single global aggregation.

Need to reduce cross‑region bandwidth while preserving metric fidelity.

Technical Solution Overview

We adopted a cloud‑native combination of Flink (real‑time compute) and SLS (log service) to build a two‑layer monitoring pipeline:

Regional data collection via Logtail agents on each gateway node (millisecond latency, millions of EPS).

Regional SLS Logstores store raw access logs for each region.

Flink Job 1 performs local aggregation, enriches logs with MySQL dimension tables (gateway cluster info, tenant level), and writes aggregated results to a regional Logstore.

Flink Job 2 reads the aggregated logs, converts them to time‑series metrics, and pushes them to a centralized SLS MetricStore.

Overall Architecture

Each region runs an independent Flink cluster that consumes its own Logstore, performs Tumble windows (10 s) aggregation, and writes lightweight summary logs. A central region (cn‑shanghai) hosts the MetricStore where all regions write their metrics, enabling a single global view in Grafana.

Flink Job 1 – Aggregation

CREATE TABLE openapi_log_source ( __time__ BIGINT, LocalIp STRING, Product STRING, Api STRING, Version STRING, Domain STRING, AK STRING, CallerUid STRING, HttpCode STRING, ErrorCode STRING, TotalUsedTime BIGINT, ClientIp STRING, RegionId STRING, Ext5 STRING, RequestContent STRING, ts AS TO_TIMESTAMP_LTZ(`__time__`*1000, 3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ('connector'='sls','project'='*****','logstore'='pop_rpc_trace_log','endpoint'='cn-shanghai-intranet.log.aliyuncs.com');
CREATE TABLE gateway_cluster_dim ( local_ip STRING, app_group STRING, region_id STRING, PRIMARY KEY (local_ip) NOT ENFORCED ) WITH ('connector'='jdbc', ...);
CREATE TABLE user_level_dim ( uid STRING, gc_level STRING, PRIMARY KEY (uid) NOT ENFORCED ) WITH ('connector'='jdbc','url'='jdbc:mysql://xxx:3306/dim_db','table-name'='user_level','lookup.cache.max-rows'='50000','lookup.cache.ttl'='10min','lookup.max-retries'='3');
CREATE TABLE machine_agg_log_sink ( window_start TIMESTAMP(3), product STRING, api STRING, version STRING, caller_uid STRING, region_id STRING, app_group STRING, gc_level STRING, http_code STRING, error_code STRING, qps BIGINT, rt_mean DOUBLE, slow1s_count BIGINT, http_2xx BIGINT, http_5xx BIGINT, http_503 BIGINT ) WITH ('connector'='sls','project'='****','logstore'='machine_agg_log');
INSERT INTO machine_agg_log_sink SELECT TUMBLE_START(l.ts, INTERVAL '10' SECOND), l.Product, l.Api, l.Version, l.CallerUid, g.region_id, g.app_group, u.gc_level, l.HttpCode, l.ErrorCode, COUNT(*) AS qps, AVG(CAST(l.TotalUsedTime AS DOUBLE)), SUM(CASE WHEN l.TotalUsedTime > 1000 THEN 1 ELSE 0 END), SUM(CASE WHEN l.HttpCode >= '200' AND l.HttpCode < '300' THEN 1 ELSE 0 END), SUM(CASE WHEN l.HttpCode >= '500' THEN 1 ELSE 0 END), SUM(CASE WHEN l.HttpCode = '503' THEN 1 ELSE 0 END) FROM openapi_log_source l LEFT JOIN gateway_cluster_dim FOR SYSTEM_TIME AS OF l.ts AS g ON l.LocalIp = g.local_ip LEFT JOIN user_level_dim FOR SYSTEM_TIME AS OF l.ts AS u ON l.CallerUid = u.uid GROUP BY TUMBLE(l.ts, INTERVAL '10' SECOND), l.Product, l.Api, l.Version, l.CallerUid, g.region_id, g.app_group, u.gc_level, l.HttpCode, l.ErrorCode;

Flink Job 2 – Metric Conversion and Cross‑Region Aggregation

CREATE TABLE machine_agg_log_source ( window_start TIMESTAMP(3), product STRING, region_id STRING, ... WATERMARK FOR window_start AS window_start - INTERVAL '5' SECOND ) WITH ('connector'='sls','project'='****','logstore'='machine_agg_log','endpoint'='cn-shanghai-intranet.log.aliyuncs.com');
CREATE TABLE metricstore_sink ( `__time_nano__` BIGINT, `__name__` STRING, `__labels__` STRING, `__value__` DOUBLE ) WITH ('connector'='sls','project'='****','logstore'='openapi_metrics','endpoint'='cn-shanghai-intranet.log.aliyuncs.com');
INSERT INTO metricstore_sink SELECT UNIX_TIMESTAMP(CAST(TUMBLE_START(window_start, INTERVAL '1' MINUTE) AS STRING))*1000000000, 'namespace_product_gw_http_req', CONCAT('product=',product,'|region_id=',region_id), CAST(SUM(qps) AS DOUBLE) FROM machine_agg_log_source GROUP BY TUMBLE(window_start, INTERVAL '1' MINUTE), product, region_id;

Job Configuration and Tuning

Two checkpoint strategies were evaluated. Strategy A (standard) uses 60 s intervals, EXACTLY_ONCE semantics, and a 10 min timeout. Strategy B (high‑availability) extends the interval to 180 s, switches to AT_LEAST_ONCE, relaxes timeout to 15 min, and tolerates up to 10 consecutive checkpoint failures, which matches the ultra‑high‑throughput, latency‑sensitive OpenAPI gateway workload.

SET 'execution.checkpointing.interval'='60s'; SET 'execution.checkpointing.mode'='EXACTLY_ONCE'; SET 'execution.checkpointing.timeout'='10min';
SET 'execution.checkpointing.interval'='180s'; SET 'execution.checkpointing.mode'='AT_LEAST_ONCE'; SET 'execution.checkpointing.timeout'='15min'; SET 'execution.checkpointing.max-concurrent-checkpoints'='1'; SET 'execution.checkpointing.tolerable-failed-checkpoints'='10';

We also switched to the enterprise‑grade gemini state backend with KV separation to handle GB‑scale state efficiently:

SET 'table.exec.state.backend'='gemini'; SET 'state.backend.gemini.kv.separate.mode'='GLOBAL_ENABLE';

Source‑Side Predicate Pushdown (SPL Processor)

To reduce data volume before Flink, we implemented an SLS SPL processor that performs row filtering, JSON field extraction, column pruning, and regular‑expression parsing directly on the SLS server. This eliminates unnecessary network transfer and CPU work in Flink.

-- Row filter
* | where Domain != 'popwarmup.aliyuncs.com'
-- Expand nested JSON fields
| parse-json -prefix 'ext5_' Ext5
| where ext5_logRegionId not in ('cn-shanghai','cn-beijing','cn-hangzhou')
| parse-json -prefix 'callerInfo_' ext5_callerInfo
| parse-json -prefix 'headers_' ext5_headers
-- Extract KV fields from RequestContent
| parse-regexp RequestContent, '[;]RegionId=([^;]*)' as request_regionId
-- Column projection (last step to minimize output)
| project LocalIp, Product, Version, Api, Domain, ErrorCode, HttpCode, TotalUsedTime, AK, RegionId, ClientIp, callerInfo_callerType, callerInfo_callerUid, callerInfo_ownerId, ext5_regionId, ext5_appGroup, ext5_stage, request_regionId;

In Flink SQL the processor is referenced via the processor option:

CREATE TABLE openapi_log_source ( __time__ BIGINT, LocalIp STRING, Product STRING, Version STRING, Api STRING, Domain STRING, ErrorCode STRING, HttpCode STRING, TotalUsedTime BIGINT, AK STRING, RegionId STRING, ClientIp STRING, callerInfo_callerType STRING, callerInfo_callerUid STRING, callerInfo_ownerId STRING, ext5_regionId STRING, ext5_appGroup STRING, ext5_stage STRING, request_regionId STRING, ts AS TO_TIMESTAMP_LTZ(`__time__`*1000,3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ('connector'='sls','project'='****','logstore'='pop_rpc_trace_log','endpoint'='cn-shanghai-intranet.log.aliyuncs.com','processor'='openapi-processor');

Visualization and Alerting

Metrics are ingested into Grafana via the SLS MetricStore data source. Teams can write PromQL queries such as:

# QPS trend per product
sum(namespace_product_gw_http_req) by (product)

# Error‑rate comparison (last minute vs one hour ago)
( sum(rate(namespace_product_gw_http_5xx[1m])) / sum(rate(namespace_product_gw_http_req[1m])) ) / ( sum(rate(namespace_product_gw_http_5xx[1m] offset 1h)) / sum(rate(namespace_product_gw_http_req[1m] offset 1h)) ) > 2

# Average latency trend
avg(namespace_product_gw_rt_mean) by (product)

Example alert rule for high error rate:

- alert: HighErrorRate
  expr: sum(namespace_product_gw_http_5xx) by (product) / sum(namespace_product_gw_http_req) by (product) > 0.01
  for: 2m
  labels:
    severity: warning
  annotations:
    summary: "{{ $labels.product }} error rate too high"
    description: "Current error rate: {{ $value | printf "%.2f" }}%"

Production Results

The solution processes over 200 TB compressed logs daily (≈2 PB raw), generates more than 500 k time‑series metrics, and supports 60+ regions and 300+ cloud products. The two‑layer aggregation reduces raw data transmission by >99 %, saving bandwidth and ensuring isolation: a failure in one region does not affect others.

Key business outcomes include sub‑minute fault detection, self‑service monitoring for all product teams, and a stable pipeline with CPU usage reduced by ~20 % after enabling GeminiStateBackend with KV separation.

Takeaways

Layered aggregation mitigates data skew and cuts cross‑region traffic.

Source‑side predicate pushdown dramatically lowers I/O and processing cost.

Enterprise‑grade state backends (Gemini + KV separation) are essential for GB‑scale state.

The architecture is applicable to service mesh telemetry, CDN log analysis, and IoT data aggregation.

Image
Image
cloud-nativebig dataFlinkreal-time monitoringSLSPredicate Pushdown
Alibaba Cloud Observability
Written by

Alibaba Cloud Observability

Driving continuous progress in observability technology!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.