Big Data 16 min read

How LLMs Can Revolutionize Data Warehouse ETL: From Push‑Pull to Stable Queries

This article explores the challenges of traditional data‑warehouse ETL, compares push and pull models, and presents an LLM‑driven architecture that generates both on‑demand SQL queries and streaming ETL code with automatic error‑feedback loops, dramatically improving cost, accuracy, and maintainability.

Alibaba Cloud Big Data AI Platform
Alibaba Cloud Big Data AI Platform
Alibaba Cloud Big Data AI Platform
How LLMs Can Revolutionize Data Warehouse ETL: From Push‑Pull to Stable Queries

Challenges in Operational Data‑Warehouse Construction

Many SREWorks community members face three common problems: high storage cost for rarely used data, heavy ETL learning and maintenance overhead, and data timeliness/accuracy issues that limit reliance on the warehouse.

Solution Design – Combining Push and Pull

From the requirement of a single SQL that can retrieve all operational objects, two abstract models are considered:

Push : Proactively manage data via an ETL pipeline (e.g., Flink, MaxCompute) that processes and stores data in the warehouse. This approach incurs high ETL maintenance cost and may reduce data accuracy.

Pull : Query data on‑demand directly from source systems (e.g., Presto). It avoids ETL management and preserves accuracy but can suffer from slow query performance.

The proposed approach leverages large language models (LLMs) to combine the advantages of both models.

LLM‑Based SQL Pre‑Query

Using an LLM, user‑provided SQL is transformed into executable Python code that connects to the relevant databases, runs the query, and returns results as JSON. The core script is:

import os
import sys
from openai import OpenAI
import traceback
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr

client = OpenAI()

def get_script(content):
    return content.split("```python")[1].split("```")[0]

def execute_python(code_str: str):
    stdout = StringIO()
    stderr = StringIO()
    return_head = 1000
    context = {}
    try:
        # redirect stdout and stderr, then execute
        with redirect_stdout(stdout), redirect_stderr(stderr):
            exec(code_str, context)
    except Exception:
        stderr.write(traceback.format_exc())
    stdout_value = stdout.getvalue()[0:return_head]
    stderr_value = stderr.getvalue()[0:return_head]
    return {"stdout": stdout_value.strip(), "stderr": stderr_value.strip()}

prompt = """
You are a database expert. Convert the given SQL into executable Python code.
Database 1: processes (mysql://[email protected]:3306/processes)
CREATE TABLE `process_table` (
  `process_name` varchar(100) DEFAULT NULL,
  `start_time` datetime DEFAULT NULL,
  `end_time` datetime DEFAULT NULL,
  `server_name` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Database 2: servers (mysql://[email protected]:3306/servers)
CREATE TABLE `server_table` (
  `server_name` varchar(100) DEFAULT NULL,
  `ip` varchar(100) DEFAULT NULL,
  `zone` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Only include necessary connection info based on the SQL.
Return only Python code; comments may be used for explanations.
"""

query_sql = "select * from processes.process_table a join servers.server_table b on a.server_name = b.server_name where b.zone = 'ZoneA';"

messages = [
    {"role": "system", "content": prompt},
    {"role": "user", "content": query_sql}
]

res = client.chat.completions.create(messages=messages, model="gpt-4")
print(res.choices[0].message.content)
exec_result = execute_python(get_script(res.choices[0].message.content))
print("result:")
print(exec_result)

if exec_result["stderr"] == "" and exec_result["stdout"] != "":
    print(exec_result["stdout"])
    sys.exit(0)

The LLM‑generated script successfully queries both databases, merges on server_name, and outputs JSON results.

Automatic Error‑Feedback Loop

To improve stability, the system retries up to three times, feeding back any error messages to the LLM for correction:

for i in range(3):
    print("Retry", i+1)
    messages = [{"role": "system", "content": prompt + "
" + query_sql}]
    if exec_result["stderr"] != "":
        messages.append({"role": "user", "content": res.choices[0].message.content + "

" + exec_result["stderr"] + "
Execution error, please fix and regenerate code"})
    else:
        messages.append({"role": "user", "content": res.choices[0].message.content + "

Execution returned no result, please regenerate"})
    res = client.chat.completions.create(messages=messages, model="gpt-4")
    exec_result = execute_python(get_script(res.choices[0].message.content))
    if exec_result["stderr"] == "" and exec_result["stdout"] != "":
        print(exec_result["stdout"])
        sys.exit(0)
print("Query failed")

With this feedback, success rate rises from ~30% to ~80%.

LLM‑Based Stream Processing (Flink)

Instead of on‑demand queries, the generated SQL can be translated into Flink SQL that continuously syncs required fields to ClickHouse, eliminating the need for repeated LLM calls in production.

-- Create Flink source tables
CREATE TABLE process_table (
  process_name STRING,
  server_name STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'test-db1.com',
  'port' = '3306',
  'username' = 'root',
  'database-name' = 'processes',
  'table-name' = 'process_table'
);

CREATE TABLE server_table (
  server_name STRING,
  zone STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'test-db2.com',
  'port' = '3306',
  'username' = 'root',
  'database-name' = 'servers',
  'table-name' = 'server_table'
);

-- ClickHouse sink tables
CREATE TABLE clickhouse_process_table (
  process_name STRING,
  server_name STRING
) WITH (
  'connector' = 'clickhouse',
  'url' = 'clickhouse://localhost:8123',
  'table-name' = 'process_table'
);

CREATE TABLE clickhouse_server_table (
  server_name STRING,
  zone STRING
) WITH (
  'connector' = 'clickhouse',
  'url' = 'clickhouse://localhost:8123',
  'table-name' = 'server_table'
);

-- Sync data
INSERT INTO clickhouse_process_table
SELECT process_name, server_name FROM process_table;

INSERT INTO clickhouse_server_table
SELECT server_name, zone FROM server_table;

The generated Flink SQL only syncs fields required by the original query, reducing unnecessary processing.

Conclusion

By using LLMs, a single user SQL can produce two code paths: a Pull‑style on‑demand query for debugging and a Push‑style streaming ETL for production. This approach avoids over‑processing, lowers ETL maintenance cost, and creates a unified data pipeline that aligns closely with user requirements.

Eliminate unnecessary ETL transformations.

Reduce maintenance overhead for data engineers.

Maintain a unified, query‑driven data model.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkSQLLLMData WarehouseETL
Alibaba Cloud Big Data AI Platform
Written by

Alibaba Cloud Big Data AI Platform

The Alibaba Cloud Big Data AI Platform builds on Alibaba’s leading cloud infrastructure, big‑data and AI engineering capabilities, scenario algorithms, and extensive industry experience to offer enterprises and developers a one‑stop, cloud‑native big‑data and AI capability suite. It boosts AI development efficiency, enables large‑scale AI deployment across industries, and drives business value.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.