Big Data 16 min read

Implementing Spark Data Lineage with Spline: A Step‑by‑Step Guide

This article explains the growing importance of data lineage in large data warehouses, evaluates three Spark lineage extraction approaches, and provides a detailed, step‑by‑step guide to integrating the open‑source Spline agent—including codeless and programmatic initialization, configuration, dispatcher setup, post‑processing, and known limitations.

Youzan Coder
Youzan Coder
Youzan Coder
Implementing Spark Data Lineage with Spline: A Step‑by‑Step Guide

Background

Data warehouses are growing, making data lineage (data provenance) essential for tracing upstream and downstream relationships between tables, tasks, and jobs. The existing system parses SQL task lineage with ANTLR, but Spark application lineage is still configured manually. Production runs Spark 2.3 and Spark 3.1, supporting Python 2/3, Java, and Scala on both YARN and Kubernetes, so any solution must handle all these variants.

Design Options

Three typical approaches for Spark application lineage were evaluated:

Code‑level parsing (e.g., SPROV).

Dynamic listening via code modification or plugins (e.g., TiTian, Pebble, Spline, Apache Atlas).

Log‑based parsing of Spark event logs.

Code‑level parsing is too complex because of multiple languages. Log‑based parsing works for Spark 3 but Spark 2 event logs lack complete Hive metadata, so the team selected dynamic listening and evaluated Spline.

Spline Overview

Spline (Spark Lineage) is an Apache‑2.0‑licensed open‑source project that collects Spark lineage through three components: the agent, the server, and the UI. The agent registers a QueryExecutionListener that receives SparkListenerSQLExecutionEnd events.

Initialization

Spline supports two initialization modes.

Codeless Initialization

Configuration‑only integration without code changes. Example spark-submit command:

spark-submit \
  --jars /path/to/lineage/spark-3.1-spline-agent-bundle_2.12-1.0.0-SNAPSHOT.jar \
  --files /path/to/lineage/spline.properties \
  --num-executors 2 --executor-memory 1G --driver-memory 1G \
  --name test_lineage --deploy-mode cluster \
  --conf spark.spline.mode=BEST_EFFORT \
  --conf spark.spline.lineageDispatcher.http.producer.url=http://172.18.221.156:8080/producer \
  --conf "spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener" \
  test.py

The --jars option loads the agent JAR. --files or --conf supplies the properties file, and the listener class is registered via spark.sql.queryExecutionListeners.

Programmatic Initialization

Explicitly enable lineage in user code.

Scala example:

// given a Spark session ...
val sparkSession: SparkSession = ???
// enable data lineage tracking with Spline
import za.co.absa.spline.harvester.SparkLineageInitializer._
sparkSession.enableLineageTracking()

Java example:

import za.co.absa.spline.harvester.SparkLineageInitializer;
// ...
SparkLineageInitializer.enableLineageTracking(session);

Python example:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("spline_app")\
    .config("spark.jars", "dbfs:/path_where_the_jar_is_uploaded")\
    .getOrCreate()
sc = spark.sparkContext
sc.setSystemProperty("spline.mode", "REQUIRED")
sc._jvm.za.co.absa.spline.harvester.SparkLineageInitializer.enableLineageTracking(spark._jsparkSession)

Lineage Extraction Logic

The agent’s SplineAgent.handle() method calls LineageHarvester.harvest(), which processes the QueryExecution object's analyzed logical plan and executed plan. tryExtractWriteCommand(logicalPlan) extracts write operations via plugins.

Plugins (e.g., DataSourceV2Plugin.writeNodeProcessor()) parse commands such as V2WriteCommand, CreateTableAsSelect, and ReplaceTableAsSelect to produce a WriteCommand structure.

The WriteCommand 's query field is recursively traversed to extract read operations.

The final output consists of two JSON objects: plan (the lineage graph) and event (auxiliary metrics).

Sample plan JSON excerpt (truncated):

[
  "plan",
  {
    "id": "acd5157c-ddc5-5ef0-b1bc-06bb8dcda841",
    "name": "team evaluation ranks",
    "operations": {
      "write": {
        "outputSource": "hdfs:///user/hive/warehouse/dm_ai.db/dws_kdt_comment_ranks_info",
        "name": "CreateDataSourceTableAsSelectCommand",
        "params": {"table": {"identifier": {"table": "dws_kdt_comment_ranks_info","database": "dm_ai"}}}
      },
      "reads": [{
        "inputSources": ["hdfs://yz-cluster-qa/user/hive/warehouse/dm_ai.db/dws_kdt_comment_rank_base"],
        "name": "LogicalRelation"
      }]
    }
  }
]

Dispatcher and Post‑Processing

The LineageDispatcher determines how the lineage JSON is sent. Built‑in implementations include: HttpLineageDispatcher – POST to an HTTP endpoint. KafkaLineageDispatcher – Publish to a Kafka topic. LoggingLineageDispatcher – Write to Spark’s stderr for debugging.

Custom dispatchers can be created by extending LineageDispatcher and providing a constructor that accepts an org.apache.commons.configuration.Configuration object. Example configuration:

spline.lineageDispatcher=my-dispatcher
spline.lineageDispatcher.my-dispatcher.className=org.example.spline.MyDispatcherImpl
spline.lineageDispatcher.my-dispatcher.prop1=value1
spline.lineageDispatcher.my-dispatcher.prop2=value2

After lineage extraction, a PostProcessingFilter can be applied (e.g., for data masking) by implementing

za.co.absa.spline.harvester.postprocessing.PostProcessingFilter

and configuring it similarly:

spline.postProcessingFilter=my-filter
spline.postProcessingFilter.my-filter.className=my.awesome.CustomFilter
spline.postProcessingFilter.my-filter.prop1=value1
spline.postProcessingFilter.my-filter.prop2=value2

Version Compatibility

For Spark 2.3, codeless initialization requires the SPARK‑23228 patch. See the related GitHub issue https://github.com/AbsaOSS/spline-spark-agent/issues/490.

Integration Steps

Compile the agent JAR for the target Spark and Scala versions. Example for Spark 3.1:

mvn scala-cross-build:change-version -Pscala-2.12
mvn clean install -Pscala-2.12,spark-3.1 -DskipTests=True

Deploy the JAR to the Spark classpath (e.g., /path/to/spark/jars) and add the following to spark-defaults.conf:

spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
spark.spline.mode=BEST_EFFORT
spark.spline.lineageDispatcher=composite
spark.spline.lineageDispatcher.composite.dispatchers=logging,http
spark.spline.lineageDispatcher.http.producer.url=http://[ip:port]/producer

Demo Deployment

A Docker‑Compose setup can launch the Spline server and UI in one command:

wget https://raw.githubusercontent.com/AbsaOSS/spline-getting-started/main/docker/docker-compose.yml
wget https://raw.githubusercontent.com/AbsaOSS/spline-getting-started/main/docker/.env
export DOCKER_HOST_EXTERNAL=yourhostip
docker-compose up

Limitations

Lineage cannot be traced through RDD‑based operations because the agent stops recursion at LogicalRDD. A possible remedy is to analyze RDD dependencies.

Lineage is emitted only on write operations; pure read‑only jobs produce no lineage output.

Despite these gaps, Spline provides a low‑overhead way to add lineage tracking to running Spark applications, and further research can improve accuracy and coverage.

big datadata lineagedata governanceSparkApache SparkSpline
Youzan Coder
Written by

Youzan Coder

Official Youzan tech channel, delivering technical insights and occasional daily updates from the Youzan tech team.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.