Understanding Spark SQL Query Execution: From Parsing to Physical Plan
This article explains how Spark SQL processes a SELECT query—detailing parsing, binding, optimization, planning, and execution steps—including the roles of SQLContext, HiveContext, Catalyst optimizer, logical and physical plans, and provides code excerpts from the Spark source.
The article begins by posing the question of what Spark SQL does when executing a statement such as SELECT a1, a2, a3 FROM tableA WHERE condition and proceeds to describe the complete processing pipeline.
It outlines the typical stages of SQL execution: the query is first parsed into an Unresolved LogicalPlan , then bound to catalog metadata to produce a Resolved LogicalPlan , optimized into an Optimized LogicalPlan , transformed into a PhysicalPlan , prepared for execution, and finally executed to generate a SchemaRDD result.
Key code from SQLContext shows how the sql method delegates to the parser:
def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText)) // parseSql performs syntax analysis
} else {
sys.error(s"Unsupported SQL dialect: $dialect")
}
}The parser is defined as a catalyst.SqlParser instance, and the helper method parseSql simply invokes it:
protected[sql] val parser = new catalyst.SqlParser
protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)The resulting logical representation is wrapped in a SchemaRDD class:
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient val baseLogicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLikeWithin the SchemaRDDLike trait, the queryExecution field triggers the execution chain:
trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient val baseLogicalPlan: LogicalPlan
private[sql] def baseSchemaRDD: SchemaRDD
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
}The executePlan method creates a QueryExecution instance that performs analysis, optimization, planning, and execution:
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
abstract class QueryExecution {
def logical: LogicalPlan
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
lazy val optimizedPlan = optimizer(analyzed)
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
planner(optimizedPlan).next()
}
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
lazy val toRdd: RDD[Row] = executedPlan.execute()
// ... other members ...
}When using HiveContext, the sql method first checks the configured dialect. For hiveql it invokes HiveQl.parseSql:
override def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
super.sql(sqlText)
} else if (dialect == "hiveql") {
new SchemaRDD(this, HiveQl.parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'")
}
}The HiveQl object parses HiveQL by first obtaining an AST with ParseUtils and then converting the tree into a logical plan:
def parseSql(sql: String): LogicalPlan = {
// ... handle non‑Hive commands ...
val tree = getAst(sql)
if (nativeCommands contains tree.getText) NativeCommand(sql)
else nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other
}
}
def getAst(sql: String): ASTNode =
ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) HiveContextalso overrides executePlan to use a specialized QueryExecution that incorporates Hive‑specific catalog and analyzer logic:
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
protected[sql] abstract class QueryExecution extends super.QueryExecution {
override lazy val optimizedPlan = optimizer(
ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))
override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
}The article then describes the Catalyst optimizer architecture, which consists of five main components: sqlParse (syntax analysis), Analyzer (binding to catalog metadata), optimizer (rule‑based logical plan transformations such as column pruning and predicate push‑down), Planner (conversion to physical operators), and CostModel (selection of the best physical plan based on statistics).
All these components operate on a tree representation of the query; rules are applied at each stage to transform the tree, and the final physical plan is executed by Spark’s runtime engine.
In summary, the article provides a detailed walkthrough of how Spark SQL (both SQLContext and HiveContext) turns a high‑level SQL statement into an executable distributed computation, illustrating the process with source code excerpts and diagrams.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
