Hadoop is a great technology built with java.
Today we will use Scala to implement a simple map reduce job and then run it using HDInsight.
We shall add the assembly plugin on our assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors.
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
We will use WordCount as an example.
The original Java class shall be transformed to a Scala class.
package com.gkatzioura.scala
import java.lang.Iterable
import java.util.StringTokenizer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._
/**
* Created by gkatzioura on 2/14/17.
*/
package object WordCount {
class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {
val one = new IntWritable(1)
val word = new Text()
override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
val itr = new StringTokenizer(value.toString)
while (itr.hasMoreTokens()) {
word.set(itr.nextToken())
context.write(word, one)
}
}
}
class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
var sum = values.asScala.foldLeft(0)(_ + _.get)
context.write(key, new IntWritable(sum))
}
}
def main(args: Array[String]): Unit = {
val configuration = new Configuration
val job = Job.getInstance(configuration,"word count")
job.setJarByClass(this.getClass)
job.setMapperClass(classOf[TokenizerMapper])
job.setCombinerClass(classOf[IntSumReader])
job.setReducerClass(classOf[IntSumReader])
job.setOutputKeyClass(classOf[Text])
job.setOutputKeyClass(classOf[Text]);
job.setOutputValueClass(classOf[IntWritable]);
FileInputFormat.addInputPath(job, new Path(args(0)))
FileOutputFormat.setOutputPath(job, new Path(args(1)))
System.exit(if(job.waitForCompletion(true)) 0 else 1)
}
}
Then we will build our example
sbt clean compile assembly
Our new jar will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar
On the next post we shall run our code using Azure’s HDInsight.
You can find the code on github.