Blog

14 Aug / End to End Testing Pig Scripts With Scala and Sbt

After fighting with all the craziness of Hadoop dependencies, sbt configuration, I managed to put together an sbt project that shows how to use Scalatest for testing map/reduce jobs and pig scripts inside an Hadoop minicluster.

Hadoop provides a pretty good support for developing integration tests or at least tests that are pretty close to an actual distributed setup. In particular, Hadoop provides APIs for creating embedded HDFS and Map/Reduce subsystems.

First of all we need to set up the right dependencies, as shown on a previous post, I’m using the Cloudera CDH4 Hadoop distribution:

[code language=”scala”] name := """pig-mrminicluster-example"""

version := "1.0"

scalaVersion := "2.10.2"

resolvers += "Cloudera Repo" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-common" % "2.0.0-cdh4.3.0" % "compile" exclude ("org.slf4j", "slf4j-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.apache.hadoop" % "hadoop-client" % "2.0.0-mr1-cdh4.3.0" % "compile" exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.apache.pig" % "pig" % "0.11.0-cdh4.3.0" % "compile" exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb", "hsqldb"),
"org.antlr" % "antlr" % "3.4" % "compile",
"commons-daemon" % "commons-daemon" % "1.0.5" % "test",
"org.apache.hadoop" % "hadoop-test" % "2.0.0-mr1-cdh4.3.0" % "test",
"org.apache.hadoop" % "hadoop-minicluster" % "2.0.0-mr1-cdh4.3.0" % "test",
"org.apache.hadoop" % "hadoop-common" % "2.0.0-cdh4.3.0" % "test",
"org.apache.hadoop" % "hadoop-hdfs" % "2.0.0-cdh4.3.0" % "test" exclude ("commons-daemon", "commons-daemon"),
"org.apache.hadoop" % "hadoop-hdfs" % "2.0.0-cdh4.3.0" % "test" classifier "tests",
"org.apache.hadoop" % "hadoop-common" % "2.0.0-cdh4.3.0" % "compile" classifier "tests",
"org.scalatest" %% "scalatest" % "1.9.1" % "test"
)

fork in test := true
[/code]

Don’t forget to set fork in test := true otherwise the classpath is not properly propagated to the task trackers that run inside the Map/Reduce minicluster. Unfortunately, the same set up doesn’t work with the latest version of SBT. This is something I definitel need to further investigate.

The next step is to define a class that starts in one shot both the HDFS and the Map/Reduce miniclusters:

[code language=”scala”] package org.apache.pig.test

import java.io.{FileOutputStream, File}
import org.apache.hadoop.mapred.MiniMRCluster
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.MiniDFSCluster

class MiniCluster(dataNodes: Int = 1, taskTrackers: Int = 1) {

private val configurationFilePath = this.getClass.getProtectionDomain().getCodeSource().getLocation().getPath() + "/hadoop-site.xml"

var configuration: Option[Configuration] = None
var miniDFSCluster: Option[MiniDFSCluster] = None
var miniMRCluster: Option[MiniMRCluster] = None

def start() {
System.setProperty("hadoop.log.dir", "build/test/logs")

val configurationFile = new File(configurationFilePath)
if (configurationFile.exists()) {
configurationFile.delete()
}

val builder = new MiniDFSCluster.Builder(new Configuration())
miniDFSCluster = Some(builder.format(true).numDataNodes(dataNodes).build())

miniMRCluster = Some(new MiniMRCluster(taskTrackers, miniDFSCluster.get.getFileSystem.getUri.toString, 1))

// Write the necessary config info to hadoop-site.xml
configuration = Some(miniMRCluster.get.createJobConf()).map(configuration => {
configuration.setInt("mapred.submit.replication", 2)
configuration.set("dfs.datanode.address", "0.0.0.0:0")
configuration.set("dfs.datanode.http.address", "0.0.0.0:0")
configuration.set("mapred.map.max.attempts", "2")
configuration.set("mapred.reduce.max.attempts", "2")
configuration.set("pig.jobcontrol.sleep", "100")
configuration
})
configuration.foreach(_.writeXml(new FileOutputStream(configurationFile)))

// Set the system properties needed by Pig
System.setProperty("cluster", configuration.get.get("mapred.job.tracker"))
System.setProperty("namenode", configuration.get.get("fs.default.name"))
}

def shutdown() {
miniDFSCluster.foreach(_.getFileSystem.close())
miniDFSCluster.foreach(_.shutdown())
miniDFSCluster = None
miniMRCluster.foreach(_.shutdown())
miniMRCluster = None
val configurationFile = new File(configurationFilePath)
if (configurationFile.exists()) {
configurationFile.delete()
}
}

}
[/code]

Here, the most important aspect is to generate a proper hadoop-site.xml configuration file that will be read by the Pig server. In fact, the Pig server can only load the Hadoop configuration files from the classpath, this is why I’m creating that file on a directory belonging to the classpath (line 10). With that simple trick, if I’ll create a PigServer instance, the hadoop-site.xml will be automatically picked up allowing that instance to connect to the running miniclusters.

Finally, below a Scalatest example that shows how to test native map/reduce jobs and Pig scripts using the miniclusters:

[code language=”scala”] import java.io.{PrintWriter, File, OutputStreamWriter}
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan
import org.apache.pig.tools.pigstats.{JobStats, OutputStats, PigProgressNotificationListener, ScriptState}
import org.apache.pig.{ExecType, PigServer}
import org.apache.pig.test.MiniCluster
import org.scalatest.fixture
import org.scalatest.matchers.MustMatchers

class TestSpec extends fixture.WordSpec with MustMatchers {
type FixtureParam = Fixture

class Fixture {

val miniCluster = new MiniCluster(4, 4)

miniCluster.start()

def shutdown() {
miniCluster.shutdown()
}
}

def withFixture(test: OneArgTest) {
val fixture = new Fixture
try {
test(fixture)
} finally {
fixture.shutdown()
}
}

"Testing with M/R" must {
"be successful" in {
fixture =>

import scala.util.Try

import java.io.{InputStreamReader, BufferedReader, OutputStreamWriter}

import org.apache.hadoop.mapred.TestLocalModeWithNewApis.TokenizerMapper
import org.apache.hadoop.mapred.WordCount
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer
import org.apache.hadoop.io.{IntWritable, Text}

val conf = fixture.miniCluster.configuration.get

val hdfs = FileSystem.get(conf)
hdfs.mkdirs(new Path("/tmp/"))

val result = Try {
val input = new Path("/tmp/input")
val output = new Path("/tmp/output")
hdfs.makeQualified(input)
hdfs.makeQualified(output)

val wr = new OutputStreamWriter(hdfs.create(new Path(input, "wordcount")))
wr.write("neeraj chaplot neerajn")
wr.close()

val conf = fixture.miniCluster.configuration.get
val job = new Job(conf, "word count")
job.setJarByClass(classOf[WordCount])
job.setMapperClass(classOf[TokenizerMapper])
job.setCombinerClass(classOf[IntSumReducer[Int]])
job.setReducerClass(classOf[IntSumReducer[Int]])
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[IntWritable])
job.setNumReduceTasks(1)
FileInputFormat.addInputPath(job, input)
FileOutputFormat.setOutputPath(job, output)

job.waitForCompletion(true)

val ctrs = job.getCounters

val is = hdfs.open(new Path(output, "part-r-00000"))
val reader = new BufferedReader(new InputStreamReader(is))
(ctrs, reader.readLine, reader.readLine)
}
result.isSuccess must equal(true)

val COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"
val combineIn = result.get._1.findCounter(COUNTER_GROUP, "COMBINE_INPUT_RECORDS").getValue
val combineOut = result.get._1.findCounter(COUNTER_GROUP, "COMBINE_OUTPUT_RECORDS").getValue
val reduceIn = result.get._1.findCounter(COUNTER_GROUP, "REDUCE_INPUT_RECORDS").getValue
val mapOut = result.get._1.findCounter(COUNTER_GROUP, "MAP_OUTPUT_RECORDS").getValue
val reduceOut = result.get._1.findCounter(COUNTER_GROUP, "REDUCE_OUTPUT_RECORDS").getValue
val reduceGrps = result.get._1.findCounter(COUNTER_GROUP, "REDUCE_INPUT_GROUPS").getValue

mapOut must equal(combineIn)
combineOut must equal(reduceIn)
combineIn must be > combineOut
reduceGrps must equal(reduceOut)

"chaplott1" must equal(result.get._2)
"neerajt2" must equal(result.get._3)

}
}

"Testing with Pig" must {
"be successful" in {
fixture =>

val conf = fixture.miniCluster.configuration

val hdfs = FileSystem.get(conf.get)
val input = new Path("input.csv")
val wr = new OutputStreamWriter(hdfs.create(input))

for (i "(" + i + ‘,’ + i + ‘,’ + "CIAO" + i + ")").mkString("")

import collection.JavaConversions._
iterator.mkString("") must equal(expectedResult)
scriptFile.deleteOnExit()
}
}

}
[/code]

That’s all folks.

By David Greco in Hadoop, Pig, scala
No Comment

Sorry, the comment form is closed at this time.

  • trance

  • techno

  • synth-pop

  • soundtrack

  • smooth-jazz

  • rock

  • rap

  • r-b

  • psychedelic

  • pop-rock

  • pop

  • new-age

  • musicians

  • metal

  • melodic-metal

  • lounge

  • jazz-funk

  • jazz

  • index.php

  • house

  • hip-hop

  • heavy-metal

  • hard-rock

  • get.php

  • electronic

  • dubstep

  • drumbass

  • downtempo

  • disco

  • country

  • clubdance

  • classical

  • chillout

  • chanson

  • breakbeat

  • blues

  • ambient

  • alternative-rock