Extending Spark SQL with LanceSparkSessionExtensions: A Complete Guide
This article explains how to inject the LanceSpark plugin into Spark, covering the core LanceSparkSessionExtensions class, various ways to register extensions, the custom parser and planner strategy implementations, and the underlying Spark mechanisms such as injectParser, injectPlannerStrategy, and PredicateHelper.
Overview of LanceSparkSessionExtensions
The Lance‑Spark plugin extends Spark‑SQL by injecting custom parser and planner logic. To activate the extensions, place the JAR on Spark’s classpath and set the configuration key
spark.sql.extensions=org.lance.spark.extensions.LanceSparkSessionExtensions. The core class org.lance.spark.extensions.LanceSparkSessionExtensions registers the parser and planner hooks.
Implementation of the extensions was introduced in the pull request https://github.com/lance-format/lance-spark/pull/91.
SparkSessionExtensions Usage
1. Using the withExtensions API
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
val spark = SparkSession.builder()
.appName("MyApp")
.withExtensions { extensions =>
// inject a custom analyzer rule
extensions.injectResolutionRule { session =>
new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
// custom logic goes here
plan
}
}
}
// inject a custom function
extensions.injectFunction(
(FunctionIdentifier("my_func"),
new ExpressionInfo(/* ... */),
(children: Seq[Expression]) => new MyExpression(children.head))
)
}
.getOrCreate()2. Using the spark.sql.extensions configuration
class MyExtensions extends SparkSessionExtensionsProvider {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectResolutionRule { _ => new MyCustomRule() }
extensions.injectFunction(
(FunctionIdentifier("my_func"), new ExpressionInfo(/* ... */),
(children: Seq[Expression]) => new MyExpression(children.head))
)
}
}
val spark = SparkSession.builder()
.appName("MyApp")
.config("spark.sql.extensions", "com.example.MyExtensions")
.getOrCreate()3. Automatic discovery with ServiceLoader
Create a class that implements SparkSessionExtensionsProvider and place a service file at
src/main/resources/META-INF/services/org.apache.spark.sql.SparkSessionExtensionsProvidercontaining the fully‑qualified class name.
package com.example
import org.apache.spark.sql.{SparkSessionExtensions, SparkSessionExtensionsProvider}
class MyExtensions extends SparkSessionExtensionsProvider {
override def apply(extensions: SparkSessionExtensions): Unit = {
// inject custom functionality, e.g., functions or rules
extensions.injectFunction(/* ... */)
}
} com.example.MyExtensionsInjecting a Custom Parser
The injectParser hook receives a ParserBuilder of type (SparkSession, ParserInterface) => ParserInterface. Lance‑Spark supplies a partial function that wraps the default parser:
{ case (_, parser) => new LanceSparkSqlExtensionsParser(parser) }This partial function ignores the first argument (the original Spark parser) and delegates parsing to LanceSparkSqlExtensionsParser.
LanceSparkSqlExtensionsParser implementation
class LanceSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface {
private lazy val astBuilder = new LanceSqlExtensionsAstBuilder(delegate)
/** Parse a string to a LogicalPlan. */
override def parsePlan(sqlText: String): LogicalPlan = {
try {
delegate.parsePlan(sqlText)
} catch {
case _: Exception => parse(sqlText)
}
}
protected def parse(command: String): LogicalPlan = {
val lexer = new LanceSqlExtensionsLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
val tokenStream = new CommonTokenStream(lexer)
val parser = new LanceSqlExtensionsParser(tokenStream)
parser.removeErrorListeners()
try {
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
astBuilder.visit(parser.singleStatement()).asInstanceOf[LogicalPlan]
} catch {
case _: ParseCancellationException =>
tokenStream.seek(0)
parser.reset()
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
astBuilder.visit(parser.singleStatement()).asInstanceOf[LogicalPlan]
}
}
}Injecting a Planner Strategy
The injectPlannerStrategy hook expects a StrategyBuilder – a function that receives a SparkSession and returns a SparkStrategy. Lance‑Spark passes the curried function LanceDataSourceV2Strategy(_), which Scala expands to session => LanceDataSourceV2Strategy(session).
LanceDataSourceV2Strategy definition
case class LanceDataSourceV2Strategy(session: SparkSession) extends SparkStrategy with PredicateHelper {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case AddColumnsBackfill(ResolvedIdentifier(catalog, ident), columnNames, source) =>
AddColumnsBackfillExec(asTableCatalog(catalog), ident, columnNames, source) :: Nil
case Optimize(ResolvedIdentifier(catalog, ident), args) =>
OptimizeExec(asTableCatalog(catalog), ident, args) :: Nil
case Vacuum(ResolvedIdentifier(catalog, ident), args) =>
VacuumExec(asTableCatalog(catalog), ident, args) :: Nil
case AddIndex(ResolvedIdentifier(catalog, ident), indexName, method, columns, args) =>
AddIndexExec(asTableCatalog(catalog), ident, indexName, method, columns, args) :: Nil
case _ => Nil
}
private def asTableCatalog(plugin: CatalogPlugin): TableCatalog = plugin match {
case t: TableCatalog => t
case _ => throw new IllegalArgumentException(s"Catalog $plugin is not a TableCatalog")
}
}The strategy matches custom logical nodes (e.g., AddColumnsBackfill, Optimize, Vacuum, AddIndex) and produces corresponding physical execution nodes.
Custom SparkStrategy Example
object MyCustomStrategy extends SparkStrategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case MyCustomLogicalPlan(child) => MyCustomExec(planLater(child)) :: Nil
case _ => Nil
}
}PredicateHelper Utilities
PredicateHelpersupplies methods that simplify predicate push‑down.
splitConjunctivePredicates
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(left, right) => splitConjunctivePredicates(left) ++ splitConjunctivePredicates(right)
case other => other :: Nil
}
}
// Example: a > 1 AND b < 2 AND c = 3 → [a > 1, b < 2, c = 3]splitDisjunctivePredicates
protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case Or(left, right) => splitDisjunctivePredicates(left) ++ splitDisjunctivePredicates(right)
case other => other :: Nil
}
}extractPredicatesWithinOutputSet
protected def extractPredicatesWithinOutputSet(condition: Expression, outputSet: AttributeSet): Option[Expression] = condition match {
case And(left, right) =>
(extractPredicatesWithinOutputSet(left, outputSet), extractPredicatesWithinOutputSet(right, outputSet)) match {
case (Some(l), Some(r)) => Some(And(l, r))
case (Some(l), None) => Some(l)
case (None, Some(r)) => Some(r)
case _ => None
}
case Or(left, right) =>
for {
l <- extractPredicatesWithinOutputSet(left, outputSet)
r <- extractPredicatesWithinOutputSet(right, outputSet)
} yield Or(l, r)
case other =>
if (other.references.subsetOf(outputSet)) Some(other) else None
}DataSourceStrategy Integration
During a data‑source scan Spark follows these steps:
Split the WHERE clause into individual predicates using splitConjunctivePredicates.
Determine which predicates can be pushed down to the underlying source (e.g., Parquet, ORC) via canEvaluate.
Pass the pushable predicates to the source’s buildScan method.
Execute the reduced scan, lowering I/O and improving performance.
object DataSourceStrategy extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, relation) =>
val predicates = splitConjunctivePredicates(filters)
val pushable = predicates.filter(pred => canEvaluate(pred, relation))
val scan = relation.buildScan(pushable)
// further processing of the scan result …
}
}This walkthrough demonstrates how Lance‑Spark leverages Spark’s extensibility points to add custom SQL syntax, planner strategies, and predicate‑push‑down logic.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology Tribe
Focused on computer science and cutting‑edge tech, we distill complex knowledge into clear, actionable insights. We track tech evolution, share industry trends and deep analysis, helping you keep learning, boost your technical edge, and ride the digital wave forward.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
