Big Data 14 min read

Extending Spark SQL with LanceSparkSessionExtensions: A Complete Guide

This article explains how to inject the LanceSpark plugin into Spark, covering the core LanceSparkSessionExtensions class, various ways to register extensions, the custom parser and planner strategy implementations, and the underlying Spark mechanisms such as injectParser, injectPlannerStrategy, and PredicateHelper.

Big Data Technology Tribe
Big Data Technology Tribe
Big Data Technology Tribe
Extending Spark SQL with LanceSparkSessionExtensions: A Complete Guide

Overview of LanceSparkSessionExtensions

The Lance‑Spark plugin extends Spark‑SQL by injecting custom parser and planner logic. To activate the extensions, place the JAR on Spark’s classpath and set the configuration key

spark.sql.extensions=org.lance.spark.extensions.LanceSparkSessionExtensions

. The core class org.lance.spark.extensions.LanceSparkSessionExtensions registers the parser and planner hooks.

Implementation of the extensions was introduced in the pull request https://github.com/lance-format/lance-spark/pull/91.

SparkSessionExtensions Usage

1. Using the withExtensions API

import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

val spark = SparkSession.builder()
  .appName("MyApp")
  .withExtensions { extensions =>
    // inject a custom analyzer rule
    extensions.injectResolutionRule { session =>
      new Rule[LogicalPlan] {
        override def apply(plan: LogicalPlan): LogicalPlan = {
          // custom logic goes here
          plan
        }
      }
    }
    // inject a custom function
    extensions.injectFunction(
      (FunctionIdentifier("my_func"),
       new ExpressionInfo(/* ... */),
       (children: Seq[Expression]) => new MyExpression(children.head))
    )
  }
  .getOrCreate()

2. Using the spark.sql.extensions configuration

class MyExtensions extends SparkSessionExtensionsProvider {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectResolutionRule { _ => new MyCustomRule() }
    extensions.injectFunction(
      (FunctionIdentifier("my_func"), new ExpressionInfo(/* ... */),
       (children: Seq[Expression]) => new MyExpression(children.head))
    )
  }
}

val spark = SparkSession.builder()
  .appName("MyApp")
  .config("spark.sql.extensions", "com.example.MyExtensions")
  .getOrCreate()

3. Automatic discovery with ServiceLoader

Create a class that implements SparkSessionExtensionsProvider and place a service file at

src/main/resources/META-INF/services/org.apache.spark.sql.SparkSessionExtensionsProvider

containing the fully‑qualified class name.

package com.example
import org.apache.spark.sql.{SparkSessionExtensions, SparkSessionExtensionsProvider}

class MyExtensions extends SparkSessionExtensionsProvider {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    // inject custom functionality, e.g., functions or rules
    extensions.injectFunction(/* ... */)
  }
}
com.example.MyExtensions

Injecting a Custom Parser

The injectParser hook receives a ParserBuilder of type (SparkSession, ParserInterface) => ParserInterface. Lance‑Spark supplies a partial function that wraps the default parser:

{ case (_, parser) => new LanceSparkSqlExtensionsParser(parser) }

This partial function ignores the first argument (the original Spark parser) and delegates parsing to LanceSparkSqlExtensionsParser.

LanceSparkSqlExtensionsParser implementation

class LanceSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface {
  private lazy val astBuilder = new LanceSqlExtensionsAstBuilder(delegate)

  /** Parse a string to a LogicalPlan. */
  override def parsePlan(sqlText: String): LogicalPlan = {
    try {
      delegate.parsePlan(sqlText)
    } catch {
      case _: Exception => parse(sqlText)
    }
  }

  protected def parse(command: String): LogicalPlan = {
    val lexer = new LanceSqlExtensionsLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    val tokenStream = new CommonTokenStream(lexer)
    val parser = new LanceSqlExtensionsParser(tokenStream)
    parser.removeErrorListeners()
    try {
      parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
      astBuilder.visit(parser.singleStatement()).asInstanceOf[LogicalPlan]
    } catch {
      case _: ParseCancellationException =>
        tokenStream.seek(0)
        parser.reset()
        parser.getInterpreter.setPredictionMode(PredictionMode.LL)
        astBuilder.visit(parser.singleStatement()).asInstanceOf[LogicalPlan]
    }
  }
}

Injecting a Planner Strategy

The injectPlannerStrategy hook expects a StrategyBuilder – a function that receives a SparkSession and returns a SparkStrategy. Lance‑Spark passes the curried function LanceDataSourceV2Strategy(_), which Scala expands to session => LanceDataSourceV2Strategy(session).

LanceDataSourceV2Strategy definition

case class LanceDataSourceV2Strategy(session: SparkSession) extends SparkStrategy with PredicateHelper {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case AddColumnsBackfill(ResolvedIdentifier(catalog, ident), columnNames, source) =>
      AddColumnsBackfillExec(asTableCatalog(catalog), ident, columnNames, source) :: Nil
    case Optimize(ResolvedIdentifier(catalog, ident), args) =>
      OptimizeExec(asTableCatalog(catalog), ident, args) :: Nil
    case Vacuum(ResolvedIdentifier(catalog, ident), args) =>
      VacuumExec(asTableCatalog(catalog), ident, args) :: Nil
    case AddIndex(ResolvedIdentifier(catalog, ident), indexName, method, columns, args) =>
      AddIndexExec(asTableCatalog(catalog), ident, indexName, method, columns, args) :: Nil
    case _ => Nil
  }

  private def asTableCatalog(plugin: CatalogPlugin): TableCatalog = plugin match {
    case t: TableCatalog => t
    case _ => throw new IllegalArgumentException(s"Catalog $plugin is not a TableCatalog")
  }
}

The strategy matches custom logical nodes (e.g., AddColumnsBackfill, Optimize, Vacuum, AddIndex) and produces corresponding physical execution nodes.

Custom SparkStrategy Example

object MyCustomStrategy extends SparkStrategy {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case MyCustomLogicalPlan(child) => MyCustomExec(planLater(child)) :: Nil
    case _ => Nil
  }
}

PredicateHelper Utilities

PredicateHelper

supplies methods that simplify predicate push‑down.

splitConjunctivePredicates

protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
  condition match {
    case And(left, right) => splitConjunctivePredicates(left) ++ splitConjunctivePredicates(right)
    case other => other :: Nil
  }
}
// Example: a > 1 AND b < 2 AND c = 3 → [a > 1, b < 2, c = 3]

splitDisjunctivePredicates

protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = {
  condition match {
    case Or(left, right) => splitDisjunctivePredicates(left) ++ splitDisjunctivePredicates(right)
    case other => other :: Nil
  }
}

extractPredicatesWithinOutputSet

protected def extractPredicatesWithinOutputSet(condition: Expression, outputSet: AttributeSet): Option[Expression] = condition match {
  case And(left, right) =>
    (extractPredicatesWithinOutputSet(left, outputSet), extractPredicatesWithinOutputSet(right, outputSet)) match {
      case (Some(l), Some(r)) => Some(And(l, r))
      case (Some(l), None)    => Some(l)
      case (None, Some(r))    => Some(r)
      case _                  => None
    }
  case Or(left, right) =>
    for {
      l <- extractPredicatesWithinOutputSet(left, outputSet)
      r <- extractPredicatesWithinOutputSet(right, outputSet)
    } yield Or(l, r)
  case other =>
    if (other.references.subsetOf(outputSet)) Some(other) else None
}

DataSourceStrategy Integration

During a data‑source scan Spark follows these steps:

Split the WHERE clause into individual predicates using splitConjunctivePredicates.

Determine which predicates can be pushed down to the underlying source (e.g., Parquet, ORC) via canEvaluate.

Pass the pushable predicates to the source’s buildScan method.

Execute the reduced scan, lowering I/O and improving performance.

object DataSourceStrategy extends Strategy with PredicateHelper {
  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case PhysicalOperation(projects, filters, relation) =>
      val predicates = splitConjunctivePredicates(filters)
      val pushable = predicates.filter(pred => canEvaluate(pred, relation))
      val scan = relation.buildScan(pushable)
      // further processing of the scan result …
  }
}

This walkthrough demonstrates how Lance‑Spark leverages Spark’s extensibility points to add custom SQL syntax, planner strategies, and predicate‑push‑down logic.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

SparkSQL ExtensionsScalaDataSourceV2LanceSparkPlannerStrategy
Big Data Technology Tribe
Written by

Big Data Technology Tribe

Focused on computer science and cutting‑edge tech, we distill complex knowledge into clear, actionable insights. We track tech evolution, share industry trends and deep analysis, helping you keep learning, boost your technical edge, and ride the digital wave forward.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.