Implementing Spark Data Lineage with Spline: A Step‑by‑Step Guide
This article explains the growing importance of data lineage in large data warehouses, evaluates three Spark lineage extraction approaches, and provides a detailed, step‑by‑step guide to integrating the open‑source Spline agent—including codeless and programmatic initialization, configuration, dispatcher setup, post‑processing, and known limitations.
Background
Data warehouses are growing, making data lineage (data provenance) essential for tracing upstream and downstream relationships between tables, tasks, and jobs. The existing system parses SQL task lineage with ANTLR, but Spark application lineage is still configured manually. Production runs Spark 2.3 and Spark 3.1, supporting Python 2/3, Java, and Scala on both YARN and Kubernetes, so any solution must handle all these variants.
Design Options
Three typical approaches for Spark application lineage were evaluated:
Code‑level parsing (e.g., SPROV).
Dynamic listening via code modification or plugins (e.g., TiTian, Pebble, Spline, Apache Atlas).
Log‑based parsing of Spark event logs.
Code‑level parsing is too complex because of multiple languages. Log‑based parsing works for Spark 3 but Spark 2 event logs lack complete Hive metadata, so the team selected dynamic listening and evaluated Spline.
Spline Overview
Spline (Spark Lineage) is an Apache‑2.0‑licensed open‑source project that collects Spark lineage through three components: the agent, the server, and the UI. The agent registers a QueryExecutionListener that receives SparkListenerSQLExecutionEnd events.
Initialization
Spline supports two initialization modes.
Codeless Initialization
Configuration‑only integration without code changes. Example spark-submit command:
spark-submit \
--jars /path/to/lineage/spark-3.1-spline-agent-bundle_2.12-1.0.0-SNAPSHOT.jar \
--files /path/to/lineage/spline.properties \
--num-executors 2 --executor-memory 1G --driver-memory 1G \
--name test_lineage --deploy-mode cluster \
--conf spark.spline.mode=BEST_EFFORT \
--conf spark.spline.lineageDispatcher.http.producer.url=http://172.18.221.156:8080/producer \
--conf "spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener" \
test.pyThe --jars option loads the agent JAR. --files or --conf supplies the properties file, and the listener class is registered via spark.sql.queryExecutionListeners.
Programmatic Initialization
Explicitly enable lineage in user code.
Scala example:
// given a Spark session ...
val sparkSession: SparkSession = ???
// enable data lineage tracking with Spline
import za.co.absa.spline.harvester.SparkLineageInitializer._
sparkSession.enableLineageTracking()Java example:
import za.co.absa.spline.harvester.SparkLineageInitializer;
// ...
SparkLineageInitializer.enableLineageTracking(session);Python example:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("spline_app")\
.config("spark.jars", "dbfs:/path_where_the_jar_is_uploaded")\
.getOrCreate()
sc = spark.sparkContext
sc.setSystemProperty("spline.mode", "REQUIRED")
sc._jvm.za.co.absa.spline.harvester.SparkLineageInitializer.enableLineageTracking(spark._jsparkSession)Lineage Extraction Logic
The agent’s SplineAgent.handle() method calls LineageHarvester.harvest(), which processes the QueryExecution object's analyzed logical plan and executed plan. tryExtractWriteCommand(logicalPlan) extracts write operations via plugins.
Plugins (e.g., DataSourceV2Plugin.writeNodeProcessor()) parse commands such as V2WriteCommand, CreateTableAsSelect, and ReplaceTableAsSelect to produce a WriteCommand structure.
The WriteCommand 's query field is recursively traversed to extract read operations.
The final output consists of two JSON objects: plan (the lineage graph) and event (auxiliary metrics).
Sample plan JSON excerpt (truncated):
[
"plan",
{
"id": "acd5157c-ddc5-5ef0-b1bc-06bb8dcda841",
"name": "team evaluation ranks",
"operations": {
"write": {
"outputSource": "hdfs:///user/hive/warehouse/dm_ai.db/dws_kdt_comment_ranks_info",
"name": "CreateDataSourceTableAsSelectCommand",
"params": {"table": {"identifier": {"table": "dws_kdt_comment_ranks_info","database": "dm_ai"}}}
},
"reads": [{
"inputSources": ["hdfs://yz-cluster-qa/user/hive/warehouse/dm_ai.db/dws_kdt_comment_rank_base"],
"name": "LogicalRelation"
}]
}
}
]Dispatcher and Post‑Processing
The LineageDispatcher determines how the lineage JSON is sent. Built‑in implementations include: HttpLineageDispatcher – POST to an HTTP endpoint. KafkaLineageDispatcher – Publish to a Kafka topic. LoggingLineageDispatcher – Write to Spark’s stderr for debugging.
Custom dispatchers can be created by extending LineageDispatcher and providing a constructor that accepts an org.apache.commons.configuration.Configuration object. Example configuration:
spline.lineageDispatcher=my-dispatcher
spline.lineageDispatcher.my-dispatcher.className=org.example.spline.MyDispatcherImpl
spline.lineageDispatcher.my-dispatcher.prop1=value1
spline.lineageDispatcher.my-dispatcher.prop2=value2After lineage extraction, a PostProcessingFilter can be applied (e.g., for data masking) by implementing
za.co.absa.spline.harvester.postprocessing.PostProcessingFilterand configuring it similarly:
spline.postProcessingFilter=my-filter
spline.postProcessingFilter.my-filter.className=my.awesome.CustomFilter
spline.postProcessingFilter.my-filter.prop1=value1
spline.postProcessingFilter.my-filter.prop2=value2Version Compatibility
For Spark 2.3, codeless initialization requires the SPARK‑23228 patch. See the related GitHub issue https://github.com/AbsaOSS/spline-spark-agent/issues/490.
Integration Steps
Compile the agent JAR for the target Spark and Scala versions. Example for Spark 3.1:
mvn scala-cross-build:change-version -Pscala-2.12
mvn clean install -Pscala-2.12,spark-3.1 -DskipTests=TrueDeploy the JAR to the Spark classpath (e.g., /path/to/spark/jars) and add the following to spark-defaults.conf:
spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
spark.spline.mode=BEST_EFFORT
spark.spline.lineageDispatcher=composite
spark.spline.lineageDispatcher.composite.dispatchers=logging,http
spark.spline.lineageDispatcher.http.producer.url=http://[ip:port]/producerDemo Deployment
A Docker‑Compose setup can launch the Spline server and UI in one command:
wget https://raw.githubusercontent.com/AbsaOSS/spline-getting-started/main/docker/docker-compose.yml
wget https://raw.githubusercontent.com/AbsaOSS/spline-getting-started/main/docker/.env
export DOCKER_HOST_EXTERNAL=yourhostip
docker-compose upLimitations
Lineage cannot be traced through RDD‑based operations because the agent stops recursion at LogicalRDD. A possible remedy is to analyze RDD dependencies.
Lineage is emitted only on write operations; pure read‑only jobs produce no lineage output.
Despite these gaps, Spline provides a low‑overhead way to add lineage tracking to running Spark applications, and further research can improve accuracy and coverage.
Youzan Coder
Official Youzan tech channel, delivering technical insights and occasional daily updates from the Youzan tech team.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
