Resolving Unsupported Oracle Data Types in Spark SQL via Custom JdbcDialects
This article explains how to overcome Spark SQL's inability to handle certain Oracle data types, such as Timestamp with local timezone and FLOAT(126), by creating and registering a custom JdbcDialect that remaps unsupported types to compatible Spark types.
In big‑data platforms, ETL often requires extracting data from traditional relational databases (RDBMS) into HDFS. When using Spark SQL for this task, developers may encounter unsupported data types, for example Oracle's Timestamp with local Timezone and FLOAT(126) .
1. System Environment
Spark version: 2.1.0.cloudera1
JDK version: Java HotSpot(TM) 64‑Bit Server VM, Java 1.8.0_131
Oracle JDBC driver: ojdbc7.jar
Scala version: 2.11.8
2. Issues When Spark SQL Reads Database Tables
Reading relational databases with Spark SQL relies on JDBC. Two main problems must be solved:
Distributed reading
Mapping raw table data to a DataFrame
2.1 Sample Business Code
public class Config {
// spark-jdbc parameter names
public static String JDBC_PARA_URL = "url";
public static String JDBC_PARA_USER = "user";
public static String JDBC_PARA_PASSWORD = "password";
public static String JDBC_PARA_DRIVER = "driver";
public static String JDBC_PARA_TABLE = "dbtable";
public static String JDBC_PARA_FETCH_SIZE = "fetchsize";
} import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
// Main class
object Main {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("yarn").appName("test").getOrCreate()
val sqlContext = sparkSession.sqlContext
val sc = sparkSession.sparkContext
val partitionNum = 16
val fetchSize = 1000
val jdbcUrl = "..."
val userName = "..."
val schema_table = "..."
val password = "..."
val jdbcDriver = "oracle.jdbc.driver.OracleDriver"
// Ensure the Oracle JDBC driver JAR is placed in Spark's lib/jars directory or added with --jars
val jdbcDF = sqlContext.read.format("jdbc").options(
Map(Config.JDBC_PARA_URL -> jdbcUrl,
Config.JDBC_PARA_USER -> userName,
Config.JDBC_PARA_TABLE -> schema_table,
Config.JDBC_PARA_PASSWORD -> password,
Config.JDBC_PARA_DRIVER -> jdbcDriver,
Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load()
val rdd = jdbcDF.rdd
rdd.count()
// ... further processing ...
}
}2.2 Unsupported Data Types
Examples of problematic types include Oracle's Timestamp with local Timezone and FLOAT(126) .
3. Solution: Custom JdbcDialects
3.1 What Is JdbcDialects?
The org.apache.spark.sql.jdbc package contains the abstract class JdbcDialects.scala, which defines the mapping between Spark DataType and database SQLType. It provides methods such as canHandle, getCatalystType, getJDBCType, and quoteIdentifier. The companion object offers get, unregisterDialect, and registerDialect utilities.
3.2 Steps to Fix the Issue
Obtain the current JdbcDialect for the target URL using JdbcDialects.get(url).
Unregister the existing dialect with JdbcDialects.unregisterDialect.
Create a new JdbcDialect implementation, overriding getCatalystType to map unsupported SQL types (e.g., Timestamp with local timezone) to a supported Spark type such as TimestampType or StringType.
Register the new dialect via JdbcDialects.registerDialect.
3.3 Example Implementation
object SaicSparkJdbcDialect {
def useMyJdbcDIalect(jdbcUrl:String,dbType:String): Unit ={
val logger = LoggerFactory.getLogger(classOf[SaicSparkJdbcDialect])
// Unregister the current dialect
val dialect = JdbcDialects
JdbcDialects.unregisterDialect(dialect.get(jdbcUrl))
if (dbType.equals("ORACLE")) {
val OracleDialect = new JdbcDialect {
// Only handle Oracle URLs
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
// Remap SQL types to Spark types when reading
override def getCatalystType(sqlType: Int, typeName: String, size: Int,
md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.TIMESTAMP || sqlType == -101 || sqlType == -102) {
// Map unsupported Timestamp with local timezone to TimestampType
Some(TimestampType)
} else if (sqlType == Types.BLOB) {
Some(BinaryType)
} else {
// Fallback to StringType for other unknown types
Some(StringType)
}
}
// No changes needed for Spark -> JDBC mapping in this example
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))
case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))
case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
case _ => None
}
override def quoteIdentifier(colName: String): String = colName
}
// Register the new dialect
JdbcDialects.registerDialect(OracleDialect)
}
}
}The article originally appeared on Jianshu .
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
