WordCount with Storm and Scala

Apache Storm is a free and open source distributed realtime computation system running on the JVM.
To get started we will implement a very simple example. Previously we implemented a word count hadoop job using scala and we uploaded it to hdinsight.
We will focus on the same word count concept but for real time cases and implement a word count topology utilizing apache storm. Our source code will be based on the official storm examples.

Storm works with spouts and bolts.

First We shall implement a spout which will emit fake data events. In our case sentences.

package com.gkatzioura.scala.storm

import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.{Fields, Values}
import org.apache.storm.utils.Utils

import scala.util.Random

  * Created by gkatzioura on 2/17/17.
class RandomSentenceSpout extends BaseRichSpout {

  var _collector:SpoutOutputCollector = _
  var _rand:Random = _

  override def nextTuple(): Unit = {


    val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away",
      "four score and seven years ago","snow white and the seven dwarfs","i am at two with nature")
    val sentence = sentences(_rand.nextInt(sentences.length))
    _collector.emit(new Values(sentence))

  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
    _collector = collector
    _rand = Random

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))


Next step is to implement a bolt which splits the sentences and emits them.

package com.gkatzioura.scala.storm

import java.text.BreakIterator

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

  * Created by gkatzioura on 2/18/17.
class SplitSentenceBolt extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val sentence = input.getString(0)
    val boundary = BreakIterator.getWordInstance

    var start = boundary.first
    var end:Int = start

    while(end!=BreakIterator.DONE) {

      end = boundary.next
      val word = sentence.substring(start,end).replaceAll("\\s+","")
      start = end
      if(!word.equals("")) {
        collector.emit(new Values(word))

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))

And the last step is the word count bolt.

package com.gkatzioura.scala.storm

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

  * Created by gkatzioura on 2/18/17.
class WordCountBolt extends BaseBasicBolt{

  val counts = scala.collection.mutable.Map[String,Int]()

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val word = input.getString(0)

    val optCount = counts.get(word)
    if(optCount.isEmpty) {
    } else {

    collector.emit(new Values(word,counts))

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {

    declarer.declare(new Fields("word","count"));


The final step is to create our topology which takes care whether we run locally or in a cluster environment.

package com.gkatzioura.scala.storm

import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields

  * Created by gkatzioura on 2/18/17.
object WordCountTopology  {

  def main(args: Array[String]): Unit = {
    println("Hello, world!")
    val builder = new TopologyBuilder
    builder.setSpout("spout", new RandomSentenceSpout, 5)
    builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout")
    builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word"))

    val conf = new Config()

    if (args != null && args.length > 0) {
      StormSubmitter.submitTopology(args(0), conf, builder.createTopology())
    else {
      val cluster = new LocalCluster
      cluster.submitTopology("word-count", conf, builder.createTopology())

Now we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Our sbt file is as follows

name := "ScalaStorm"

version := "1.0"

scalaVersion := "2.12.1"

scalacOptions += "-Yresolve-term-conflict:package"

libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"

And then we issue a build

sbt clean compile assembly

You can find the sourcecode on github.

On the next post we shall deploy our Storm app to HDInsight.

Run Scala implemented Hadoop Jobs on HDInsight

Previously we set up a Scala application in order to execute a simple word count on hadoop.

What comes next is uploading our application to HDInsight.

So we shall proceed in creating a Hadoop cluster on HDInsight.


Then we will create the hadoop cluster.


As you can see we specify the admin console credentials and the ssh user to login to the head node.

Our hadoop cluster will be backed by an azure storage account.


Then it is time to upload our text files to the azure storage account.

For more information on managing a storage account with azure cli check the official guide. Any text file will work.

azure storage blob upload mytext.txt scalahadoopexample  example/data/input.txt

Now we can ssh to our Hadoop node.

First let’s run the examples that come packaged with the HInsight hadoop cluster.

hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar wordcount /example/data/input.txt /example/data/results

Check the results

hdfs dfs -text /example/data/results/part-r-00000

And then we are ready to scp the scala code to our hadoop node and issue as wordcount.

hadoop jar ScalaHadoop-assembly-1.0.jar /example/data/input.txt /example/data/results2

And again check the results

hdfs dfs -text /example/data/results2/part-r-00000

That’s it! HDinsight makes it pretty straight forward!

WordCount on Hadoop with Scala

Hadoop is a great technology built with java.

Today we will use Scala to implement a simple map reduce job and then run it using HDInsight.

We shall add the assembly plugin on our assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors.

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"

We will use WordCount as an example.
The original Java class shall be transformed to a Scala class.

package com.gkatzioura.scala

import java.lang.Iterable
import java.util.StringTokenizer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._

  * Created by gkatzioura on 2/14/17.
package object WordCount {

  class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {

    val one = new IntWritable(1)
    val word = new Text()

    override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
      val itr = new StringTokenizer(value.toString)
      while (itr.hasMoreTokens()) {
        context.write(word, one)

  class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
    override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
      var sum = values.asScala.foldLeft(0)(_ + _.get)
      context.write(key, new IntWritable(sum))

  def main(args: Array[String]): Unit = {
    val configuration = new Configuration
    val job = Job.getInstance(configuration,"word count")
    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path(args(1)))
    System.exit(if(job.waitForCompletion(true))  0 else 1)


Then we will build our example

sbt clean compile assembly

Our new jar will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar

On the next post we shall run our code using Azure’s HDInsight.

You can find the code on github.