Getting Started With Scalding

If you're looking for the best logo for a development tool/framework, Scalding is the clear winner.  Who's gonna beat a fire breathing elephant?

Scalding is an extension to the Cascading framework that allows development in the Scala language rather than Java.  Cascading and it's plumbing analogies for Hadoop development make a lot of sense.  However, the Java implementation requires a lot of boilerplate code, whereas the Scala code is much cleaner and concise.

scalding.png

Building Scalding

On the day I started playing around with Scalding (early October 2014), the main project build was failing.  Just my luck.  When running the "./sbt test" command, I kept getting errors.  It took me a few hours of tinkering to figure out the issue was with the Scalding code, not my setup.  However, the build status is on the main Scalding page.  Under the "Building" section, look for the icon that indicates the Travis CI build status.  If it's not showing as successful, follow the steps below to use a different branch/tag from the git repo.

Instead of the main developer branch, I used the 0.11.2 tag which was the latest non-RC tag in the repo.

git clone https://github.com/twitter/scalding.git
git tag -l 
git checkout tags/0.11.2

Using 0.11.2, I was able to successfully complete all of the build steps on the Getting Started page.

./sbt update
./sbt test
./sbt assembly

Running Scalding Jobs

With Scalding built, it's time to run a few tutorials that are bundled with Scalding.

Tutorial0 is the simplest one (sort of obvious since it's number zero) that copies a file.  Twitter has done a great job of documenting the Scala code as well as steps to run the tutorial and verify outputs.  

Be sure to use local mode at this point rather than messing with HDFS related jobs.  From the main Scalding directory, Tutorial0 can be run with the following command

scripts/scald.rb --local tutorial/Tutorial0.scala

Once you're comfortable running the jobs, you can try them with --hdfs-local instead of --local.  With this option, the jobs run as a local map reduce job without requiring a full Hadoop cluster.

Running Jobs On A Cluster

The problems started when I decided to run scald.rb with the --hdfs option.  I had a Hortonworks Sandbox running on my machine and figured I could easily push Scalding jobs to it.  Well, it was not all that easy, but eventually I did figure out a solution.

Before running the HDFS jobs, make sure to copy all of the tutorial/data files into your home directory on HDFS.  All input files must be on HDFS, not a local directory.

scripts/scald.rb --hdfs tutorial/Tutorial0.scala

As with everything HDFS related, a few characters can send you down a long ugly path of trying to figure out config files, permissions, and other settings until you finally give up or figure out the one little change required to make it work.

Again, my environment consisted of a Hortonworks Sandbox version 2.0 running in VirtualBox 4.3.18.  I had been using this sandbox for Hive, HBase, and other work so I knew it was setup property.

In order to run Scalding against a remote Hadoop cluster, you have to setup your local hadoop config files.  I simply copied /etc/hadoop/*.xml files from my sandbox down to my local installation of hadoop.  With those config files in place, the Scalding jobs were trying to access my cluster.  

The error I received was:

14/10/15 07:54:03 ERROR security.UserGroupInformation: 
PriviledgedActionException as:aamick (auth:SIMPLE) 
cause:java.io.FileNotFoundException: File does not exist: 
hdfs://sandbox.hortonworks.com:8020/Users/aamick/job-jars/Tutorial0.jar
14/10/15 07:54:03 ERROR security.UserGroupInformation:
 PriviledgedActionException as:aamick (auth:SIMPLE) 
cause:java.io.FileNotFoundException: File does not exist: 
hdfs://sandbox.hortonworks.com:8020/Users/aamick/job-jars/Tutorial0.jar
14/10/15 07:54:03 INFO flow.Flow: [Tutorial0] stopping all jobs

The path was matching my local Mac Users path rather than the hdfs /user/aamick path.  After a lot of researching error messages and forum posts, I wasn't making progress.  That's when I decided to try copying my tutorial data files from my local machine up to the remote HDFS location.  Previously, I had added the files through the HUE interface on the Sandbox.

Ah ha!  Now I got a good error message and it had nothing to do with Scalding.  It's an issue with accessing my Hadoop cluster from my local machine.

aamick@:~/Documents/dev/scalding$ hadoop fs -put tutorial/data/words.txt tutorial/data
2014-10-15 08:40:14.716 java[5429:1703] Unable to load realm info from SCDynamicStore
14/10/15 08:40:15 WARN util.NativeCodeLoader: 
Unable to load native-hadoop library for your platform... 
using builtin-java classes where applicable
14/10/15 08:41:16 INFO hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 
60000 millis timeout while waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.0.2.15:50010]
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
	at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1305)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1128)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
14/10/15 08:41:16 INFO hdfs.DFSClient: 
Abandoning BP-1578958328-10.0.2.15-1382306880516:blk_1073744638_3828
14/10/15 08:41:16 INFO hdfs.DFSClient: Excluding datanode 10.0.2.15:50010
14/10/15 08:41:16 WARN hdfs.DFSClient: DataStreamer Exception

Why was it trying to use IP 10.0.2.15 for the sandbox address rather than 127.0.0.1.  Everything I had setup locally was using sandbox.hortonworks.com or 127.0.01.  All of my local Hadoop config files that I had copied down from the cluster referenced sandbox.hortonworks.com which is mapped to 127.0.0.1

After a bit more digging and looking at my Live Datanodes setup for the namenode (sandbox.hortonworks.com:50070), I saw a reference to the 10.0.2.15 IP.  Not sure if this is the case, but it looks like I was connecting to the cluster and then Hadoop was using the 10.02.15 IP as the "transferring address".

At this point, I could have stopped and realized that using Scalding with an --hdfs setting was not going to work with my sandbox config.  However, I had already spent several hours on this issue and I wanted to see it work on the sandbox!

Sandbox and Scalding Success

The work around I used was setting up Scalding, Scala, and Java on the sandbox VM itself.  I would not want to use this setup for development and testing, but it worked to prove out running Scalding jobs on a remote Hadoop cluster.  And that's what I was really after in the end.

Using the same steps outlined in Building Scalding section above, I was able to run the scald.rb --hdfs command on the sandbox machine without any errors.  The sandbox machine already had Java and Scala setup.  The one issue I ran into was a missing slash in the default PATH setting under /etc/bashrc.  The slash between {JAVA_HOME} and bin was missing.

The updated bashrc lines are:

export JAVA_HOME=/usr/jdk64/jdk1.6.0_31
export PATH="${JAVA_HOME}/bin:$PATH"

With that change in place, everything ran without errors.

[root@sandbox scalding]# scripts/scald.rb --hdfs tutorial/Tutorial0.scala 
scripts/scald.rb:196: warning: already initialized constant SCALA_LIB_DIR
rsyncing 19.9M from scalding-core-assembly-0.11.2.jar to sandbox.hortonworks.com in background...
rsyncing 1.5K from job-jars/Tutorial0.jar to sandbox.hortonworks.com in background...
Waiting for 2 background threads...
14/10/15 10:15:38 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
14/10/15 10:15:38 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
14/10/15 10:15:45 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
14/10/15 10:15:45 INFO util.HadoopUtil: resolving application jar from found main method on: com.twitter.scalding.Tool$
14/10/15 10:15:45 INFO planner.HadoopPlanner: using application jar: /root/scalding-core-assembly-0.11.2.jar
14/10/15 10:15:45 INFO property.AppProps: using app.id: B35245C840964D45A3F6B47D49AC0980
14/10/15 10:15:46 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
14/10/15 10:15:47 INFO util.Version: Concurrent, Inc - Cascading 2.5.5
14/10/15 10:15:47 INFO flow.Flow: [Tutorial0] starting
14/10/15 10:15:47 INFO flow.Flow: [Tutorial0]  source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["tutorial/data/hello.txt"]
14/10/15 10:15:47 INFO flow.Flow: [Tutorial0]  sink: Hfs["TextLine[['offset', 'line']->[ALL]]"]["tutorial/data/output0.txt"]
14/10/15 10:15:47 INFO flow.Flow: [Tutorial0]  parallel execution is enabled: true
14/10/15 10:15:47 INFO flow.Flow: [Tutorial0]  starting jobs: 1
14/10/15 10:15:47 INFO flow.Flow: [Tutorial0]  allocating threads: 1
14/10/15 10:15:47 INFO flow.FlowStep: [Tutorial0] starting step: (1/1) tutorial/data/output0.txt
14/10/15 10:15:48 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
14/10/15 10:15:49 INFO client.RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
14/10/15 10:15:54 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
14/10/15 10:15:54 INFO mapred.FileInputFormat: Total input paths to process : 1
14/10/15 10:15:54 INFO mapreduce.JobSubmitter: number of splits:1

In the end, I was able to run Scalding against a Hadoop cluster (even if it was a single node cluster).  

Is Scalding something that deserves more attention?  Absolutely.  The clean code, interfaces to JDBC, HBase, and other sources/sinks make Scalding a great tool for data workflows and processing.