How to Deploy Real-Time AI Models with Apache Flink 2.1
This guide explains Apache Flink 2.1's new AI model DDL and ML_PREDICT function, showing step‑by‑step how to create, manage, and invoke AI models in streaming SQL, configure resources with Little's Law, and process risk‑assessment results in real time.
Apache Flink 2.1 was released in August, marking a milestone toward a unified Data + AI platform.
Key real‑time AI capabilities include:
New AI model DDL that allows creating and altering models via Flink SQL or Table API.
Extended ML_PREDICT table‑valued function for real‑time model inference.
Example workflow:
Use the CREATE MODEL statement to define a model, e.g.:
CREATE MODEL `compliance_model`
INPUT (text STRING)
OUTPUT (response STRING)
WITH(
'provider'='openai',
'endpoint'='https://api.openai.com/v1/llm/v1/chat',
'api-key'='abcdefg',
'system_prompt'='你是电商合规审核员,请判断商品标题是否含有酒精、烟草等敏感内容,仅返回JSON:{"risk":0.0~1.0,"reason":"原因"}',
'model'='gpt-4o'
);Then run real‑time inference with ML_PREDICT:
SELECT * FROM ML_PREDICT(
INPUT => TABLE input_table,
MODEL => MODEL my_model,
ARGS => DESCRIPTOR(text),
CONFIG => MAP['async','true']
);Define a source table, for example a Kafka topic:
CREATE TABLE product_source (
id STRING,
title STRING,
ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector'='kafka',
'topic'='product_source',
'properties.bootstrap.servers'='localhost:9092',
'format'='json'
);Define a sink table for the risk results:
CREATE TABLE risk_sink (
id STRING,
title STRING,
risk DOUBLE,
reason STRING
) WITH (
'connector'='kafka',
'topic'='risk_sink',
'properties.bootstrap.servers'='localhost:9092',
'format'='json'
);Launch the job with a single INSERT‑SELECT statement:
INSERT INTO risk_sink
SELECT
id,
title,
CAST(JSON_VALUE(response,'$.risk') AS DOUBLE) AS risk,
JSON_VALUE(response,'$.reason') AS reason
FROM (
SELECT
id,
title,
ML_PREDICT(compliance_model, title) AS response
FROM product_source
) t;Sample input data (Kafka messages) and the corresponding output illustrate how the model flags titles containing alcohol or other sensitive content.
Flink’s built‑in support for large models uses asynchronous calls by default. For resource planning, apply Little’s Law (L = λ × W) to size max-concurrent-operations based on expected QPS and model latency.
For example, targeting 100 QPS with a 1.2 s 99th‑percentile latency requires about 120 concurrent operations, plus additional memory tuning on TaskManagers.
Flink 2.1’s ML framework also natively supports an end‑to‑end RAG pipeline (Embedding → Vector Store → Vector Retrieval → LLM), which will be covered separately.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
