Writing to HBase Using Scalding

Scalding makes it easy to load and process data into Hadoop via MapReduce. HBase is a low latency read/write database on top of Hadoop. In terms of Scalding, it's just another location to read or write data. The Spyglass project makes it very easy to sink data to HBase in a Scalding job.

This post details my experience with writing output to an HBase table. It utilizes the word count program on the Scalding Getting Started page. For the code, the only difference is the last line that writes out the results. Even though the code differences are minor, there area few other issues I had to work through regarding HBase and Spyglass versions.

The entire project can be found on GitHub, but I will also go through a few of the details here.

Scalding Source

package com.aca

import com.twitter.scalding._
import cascading.tuple.Fields
import parallelai.spyglass.hbase._

class Job1 (args: Args) extends Job(args) {
	val pipe = TextLine( args("input") )
    	.flatMap('line -> 'word) { line : String =>
        line.toLowerCase.split("\\s+")
    }
    .groupBy('word) { _.size }

    val pipe2 = new HBasePipeWrapper(pipe).toBytesWritable(new Fields("word"), new Fields("size"))
    .write(new HBaseSource("TEST", "myserver.mycompany.com", 'word, List("CF"), List(new Fields("size")) ) )
}

The code is identical to the sample WordCount program until the final two lines. In those lines, the tuples are converted to bytes and then the pipe is written out to the TEST table in HBase.

Issues Encountered

With any Hadoop related code, issues should be expected. Not because Hadoop is bad, but simply because the ecosystem changes so quickly and examples become outdated or examples don't show a full running program. 

1) HBase version differences

The version of Spyglass I started with was built against HBase 0.94.x and I was using 0.98.x. When trying to use the 0.94 version of Spyglass, several ProtoBuf errors are thrown when running the Scalding job. Generally, if you get a ProtoBuf error with HBase, you know it's a version error.

After some digging on conjars, I found version 2.10_0.10_CDH5_4.4 that was using version 0.98.x of HBase.  Switching my pom.xml to use this version of Spyglass solved the ProtoBuf errors.

2) HBase likes bytes

It took me a while to figure out the HBasePipeWrapper line. The Spyglass examples in the github documentation did not show the conversion to bytes because the sample program was already reading from HBase which would cause the tuples to be byte based.

The errors I received gave me an indication that I needed to convert the 'word and 'size values to bytes. After some digging, I found the HBaseConversions class in the Spyglass project and was able to add the "val pipe2" line to convert the tuples to bytes.

Running the job

In order to run the Scalding job, the hbase-protocol JAR file must be placed on the classpath when the job is run.  Without this, an IllegalAccessError for ZeroCopyLiteralByteString is thrown.  

java.lang.IllegalAccessError: class com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass com.google.protobuf.LiteralByteString

See Jira HBASE-10304 for the full details and the posted workaround of setting the classpath as part of submitting the job. With the classpath set, the job is ready to run against the cluster and access an input file on HDFS.

The hbase-protocal-0.98.1 jar can be obtained from the Maven Repository or from your HBase cluster. With it on your local machine, it can be referenced when running the Scalding job as shown below:

HADOOP_CLASSPATH=/home/andy/dev/hbase-protocol-0.98.1-hadoop2.jar hadoop jar 
target/scalding-hbase-examples-0-jar-with-dependencies.jar com.twitter.scalding.Tool 
com.aca.Job1 --hdfs --input /tmp/aca/input/input.txt

Sure, there were a few issues, but nothing that a little research and trial and error couldn't fix. With Scalding as a framework, it's possible to write data to HDFS, Hive, HBase, JDBC targets, and more. It's nice to have a single framework that can access and process data from the variety of components that make up a Hadoop ecosystem.