Cloud Native 14 min read

How to Structure Weakly Structured Logs with Flink SLS SPL

This guide explains how to use Alibaba Cloud's SLS connector and SPL expressions within Flink to clean, parse, and transform weakly structured log data into a structured table suitable for real‑time SQL analysis, covering sample logs, field extraction rules, and step‑by‑step configuration.

Alibaba Cloud Native
Alibaba Cloud Native
Alibaba Cloud Native
How to Structure Weakly Structured Logs with Flink SLS SPL

Alibaba Cloud Log Service (SLS) provides large‑scale, low‑cost, real‑time storage and analysis for log, metric, and trace data. The Flink connector for SLS allows SLS to be used as a source or sink table in Flink jobs. However, many business logs are only weakly structured, requiring preprocessing before Flink SQL can consume them.

Weakly Structured Log Challenges

Log data often comes from multiple sources and formats without a fixed schema, including JSON strings, CSV, or irregular Java stack traces. Flink SQL expects a fixed schema (field names, types, and counts), creating a gap that must be bridged by a cleaning layer.

Sample Log and Extraction Requirements

The example log contains a JSON Payload field, a requestURL, an error field that mixes plain text with JSON, a __tag__:__path__ indicating the log file path, and a caller field with file and line information.

Extract httpCode, errorCode, errorMessage, requestID from error.

Extract the service name ( service_a) from __tag__:__path__.

Extract fileName and fileNo from caller.

Extract project and schedule.type (as scheduleType) from Payload.

Rename __source__ to serviceIP and discard all other fields.

The resulting table schema can then be queried directly with Flink SQL.

Processing Options

Three approaches are compared:

Data Processing (SLS) Scheme: Create a target Logstore and use SLS data processing tasks to clean data.

Flink Scheme: Use Flink SQL functions (regex, JSON) to parse fields after reading raw logs.

SPL Scheme: Configure SPL statements in the Flink SLS connector so that the source table already contains cleaned, structured fields.

The SPL scheme is highlighted as the most lightweight, maintainable, and extensible solution because it performs cleaning close to the data source, avoiding intermediate Logstores or temporary tables.

Using SPL in Flink

The SPL pipeline for the sample log is:

* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller
| parse-json Payload
| project-away Payload
| parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson
| parse-json errorJson
| parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName
| parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo
| project-rename serviceHost="__tag__:__hostname__"
| extend scheduleType = json_extract_scalar(schedule, '$.type')
| project httpCode, errorCode, errorMessage, requestID, fileName, fileNo, serviceHost, scheduleType, project

Each step is explained:

Project: keep only fields needed for further parsing.

Parse‑json: expand Payload into individual JSON fields.

Project‑away: drop the original Payload string.

Parse‑regexp on error: extract embedded JSON.

Parse‑json on the extracted JSON to obtain error details.

Parse‑regexp on __tag__:__path__ to get serviceName.

Parse‑regexp on caller to get fileName and fileNo.

Project‑rename: rename hostname tag to serviceHost.

Extend: extract schedule.type as scheduleType.

Final project: keep only the required columns.

Creating the Flink Temporary Table

In the Flink console, create a temporary table that points to the SLS Logstore and embeds the SPL pipeline in the query property:

CREATE TEMPORARY TABLE sls_input_complex (
  errorCode STRING,
  errorMessage STRING,
  fileName STRING,
  fileNo STRING,
  httpCode STRING,
  requestID STRING,
  scheduleType STRING,
  serviceHost STRING,
  project STRING,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' = 'cn-beijing-intranet.log.aliyuncs.com',
  'accessId' = '${ak}',
  'accessKey' = '${sk}',
  'starttime' = '2024-02-01 10:30:00',
  'project' = '${project}',
  'logstore' = '${logstore}',
  'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode, errorMessage, requestID, fileName, fileNo, serviceHost, scheduleType, project'
);

Replace ${ak}, ${sk}, ${project}, and ${logstore} with credentials that have read access to the Logstore.

Running a Continuous Query

After the table is defined, execute a simple continuous query to verify the cleaned data: SELECT * FROM sls_input_complex; The Flink console shows each column populated with values produced by the SPL pipeline, confirming that the weakly structured log has been transformed into a fully structured table ready for downstream analytics.

Conclusion

Configuring SPL directly in the Flink SLS connector provides a lightweight, maintainable way to cleanse weakly structured logs at the source, reducing network traffic and allowing developers to focus on business logic within Flink. The built‑in SLS scan query also supports SPL, enabling real‑time verification of the cleaning logic.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSLSSPLData Cleansing
Alibaba Cloud Native
Written by

Alibaba Cloud Native

We publish cloud-native tech news, curate in-depth content, host regular events and live streams, and share Alibaba product and user case studies. Join us to explore and share the cloud-native insights you need.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.