Big Data 8 min read

Resolving Unsupported Oracle Data Types in Spark SQL via Custom JdbcDialects

This article explains how to overcome Spark SQL's inability to handle certain Oracle data types, such as Timestamp with local timezone and FLOAT(126), by creating and registering a custom JdbcDialect that remaps unsupported types to compatible Spark types.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Resolving Unsupported Oracle Data Types in Spark SQL via Custom JdbcDialects

In big‑data platforms, ETL often requires extracting data from traditional relational databases (RDBMS) into HDFS. When using Spark SQL for this task, developers may encounter unsupported data types, for example Oracle's Timestamp with local Timezone and FLOAT(126) .

1. System Environment

Spark version: 2.1.0.cloudera1

JDK version: Java HotSpot(TM) 64‑Bit Server VM, Java 1.8.0_131

Oracle JDBC driver: ojdbc7.jar

Scala version: 2.11.8

2. Issues When Spark SQL Reads Database Tables

Reading relational databases with Spark SQL relies on JDBC. Two main problems must be solved:

Distributed reading

Mapping raw table data to a DataFrame

2.1 Sample Business Code

public class Config {
  // spark-jdbc parameter names
  public static String JDBC_PARA_URL = "url";
  public static String JDBC_PARA_USER = "user";
  public static String JDBC_PARA_PASSWORD = "password";
  public static String JDBC_PARA_DRIVER = "driver";
  public static String JDBC_PARA_TABLE = "dbtable";
  public static String JDBC_PARA_FETCH_SIZE = "fetchsize";
}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

// Main class
object Main {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("yarn").appName("test").getOrCreate()
    val sqlContext = sparkSession.sqlContext
    val sc = sparkSession.sparkContext
    val partitionNum = 16
    val fetchSize = 1000
    val jdbcUrl = "..."
    val userName = "..."
    val schema_table = "..."
    val password = "..."
    val jdbcDriver = "oracle.jdbc.driver.OracleDriver"
    // Ensure the Oracle JDBC driver JAR is placed in Spark's lib/jars directory or added with --jars
    val jdbcDF = sqlContext.read.format("jdbc").options(
          Map(Config.JDBC_PARA_URL -> jdbcUrl,
            Config.JDBC_PARA_USER -> userName,
            Config.JDBC_PARA_TABLE -> schema_table,
            Config.JDBC_PARA_PASSWORD -> password,
            Config.JDBC_PARA_DRIVER -> jdbcDriver,
            Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load()
    val rdd = jdbcDF.rdd
    rdd.count()
    // ... further processing ...
  }
}

2.2 Unsupported Data Types

Examples of problematic types include Oracle's Timestamp with local Timezone and FLOAT(126) .

3. Solution: Custom JdbcDialects

3.1 What Is JdbcDialects?

The org.apache.spark.sql.jdbc package contains the abstract class JdbcDialects.scala, which defines the mapping between Spark DataType and database SQLType. It provides methods such as canHandle, getCatalystType, getJDBCType, and quoteIdentifier. The companion object offers get, unregisterDialect, and registerDialect utilities.

3.2 Steps to Fix the Issue

Obtain the current JdbcDialect for the target URL using JdbcDialects.get(url).

Unregister the existing dialect with JdbcDialects.unregisterDialect.

Create a new JdbcDialect implementation, overriding getCatalystType to map unsupported SQL types (e.g., Timestamp with local timezone) to a supported Spark type such as TimestampType or StringType.

Register the new dialect via JdbcDialects.registerDialect.

3.3 Example Implementation

object SaicSparkJdbcDialect {

  def useMyJdbcDIalect(jdbcUrl:String,dbType:String): Unit ={

    val logger = LoggerFactory.getLogger(classOf[SaicSparkJdbcDialect])

    // Unregister the current dialect
    val dialect = JdbcDialects
    JdbcDialects.unregisterDialect(dialect.get(jdbcUrl))

    if (dbType.equals("ORACLE")) {
      val OracleDialect = new JdbcDialect {
          // Only handle Oracle URLs
          override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
          // Remap SQL types to Spark types when reading
          override def getCatalystType(sqlType: Int, typeName: String, size: Int,
                                       md: MetadataBuilder): Option[DataType] = {
            if (sqlType == Types.TIMESTAMP || sqlType == -101 || sqlType == -102) {
              // Map unsupported Timestamp with local timezone to TimestampType
              Some(TimestampType)
            } else if (sqlType == Types.BLOB) {
              Some(BinaryType)
            } else {
              // Fallback to StringType for other unknown types
              Some(StringType)
            }
          }
          // No changes needed for Spark -> JDBC mapping in this example
          override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
            case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))
            case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
            case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))
            case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))
            case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
            case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
            case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
            case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
            case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
            case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))
            case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
            case _ => None
          }
          override def quoteIdentifier(colName: String): String = colName
        }
        // Register the new dialect
        JdbcDialects.registerDialect(OracleDialect)
      }
    }
}

The article originally appeared on Jianshu .

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataJDBCETLOracleSparkScalaCustom Dialect
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.