Using Scalding And JDBC To Access Oracle

Scalding-JDBC utilizes Cascading-JDBC which comes pre-built to support several relational databases. The bad news is that Oracle is not one of the default supported databases because the Oracle JDBC driver is not available on any public Maven repos. The good news is that the Cascading-JDBC readme page has steps to add the Oracle driver to your local Maven repo so that the driver is available for use.

Once the Oracle driver is added to the local Maven repo, it's time to work on the Scalding code. I'm using Scalding version 011.2. The latest "develop" branch has been refactored to allow compile time definition of the driver. The code below works with Scalding version 0.11.2 where the drivers were defined in a map inside of the JDBCSource object.

My Scala skills are limited. However, I was able to extend the JDBCSource code to and add "oracle" as an option when defining a ConnectionSpec.

package com.aca.jdbc

import com.twitter.scalding._
import com.twitter.scalding.jdbc._
import cascading.jdbc._

abstract class OracleJDBCSource extends JDBCSource {
	// override the protected driverFor class from JDBCSource 
	// to add in the oracle driver mapping
	// In Scala, protected defs can be overridden by subClasses 
	// and the def is visible to subClasses, 
	// not the enclosing package like in Java.
	override protected def driverFor(adapter: String): String =
		Map("mysql" -> "com.mysql.jdbc.Driver",
			"hsqldb" -> "org.hsqldb.jdbcDriver",
			"oracle" -> "oracle.jdbc.OracleDriver")
		.apply(adapter)

	// Override the following three members when you extend this class
	// val tableName: String = ???
	// val columns: Iterable[ColumnDefinition] = ???
	// protected def currentConfig: ConnectionSpec = ???
}

With the extended class, Oracle is now a registered driver that can be used like any of the preset drivers that ship with Scalding JDBC. For my project, I needed to access multiple table from the same Oracle schema. To avoid repeating my connection info in each table source, I defined a single source class that held the connection details.

package com.aca.jdbc

import com.twitter.scalding._
import com.twitter.scalding.jdbc._
import cascading.jdbc._

abstract class SomeOracleSource extends OracleJDBCSource {
	val connInfo = SomeConnectionInfo
	override def currentConfig = ConnectionSpec(connInfo.connectUrl,connInfo.dbuser,connInfo.dbpass, "oracle")
}

With the single source definition in place, I was able to define the Oracle tables and then use them in a job.

package com.aca.jdbc

import com.twitter.scalding._
import com.twitter.scalding.jdbc._
import cascading.jdbc._

case class Table_METER extends SomeOracleSource {
	override val tableName = "METER"
	override val columns = List(
		varchar("METER_NUMBER", 30),
		bigint("METER_NUMBER_INDEX"),
		varchar("METER_NAME", 50) )

	override val filterCondition = Option("METER_NUMBER like '1701%' ");
}

Below is a simple job that copies the Oracle table data to HDFS.

package com.aca

import com.twitter.scalding._
import com.twitter.scalding.jdbc._
import cascading.jdbc._
import com.aca.jdbc._

class JobCopy (args: Args) extends Job(args) {
  Table_METER().read.write(Tsv( args("output_meter") ))
}

That's it. The hardest part was figuring out how to add Oracle to the map of drivers in the JDBCSource object. Once that is complete, the remaining code is simply a matter of making the connection, defining the table you want to access, and then creating a Scalding job to access the table.