WordCount with Storm and Scala

Apache Storm is a free and open source distributed realtime computation system running on the JVM.
To get started we will implement a very simple example. Previously we implemented a word count hadoop job using scala and we uploaded it to hdinsight.
We will focus on the same word count concept but for real time cases and implement a word count topology utilizing apache storm. Our source code will be based on the official storm examples.

Storm works with spouts and bolts.

First We shall implement a spout which will emit fake data events. In our case sentences.

package com.gkatzioura.scala.storm


import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.{Fields, Values}
import org.apache.storm.utils.Utils

import scala.util.Random

/**
  * Created by gkatzioura on 2/17/17.
  */
class RandomSentenceSpout extends BaseRichSpout {

  var _collector:SpoutOutputCollector = _
  var _rand:Random = _

  override def nextTuple(): Unit = {

    Utils.sleep(100)

    val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away",
      "four score and seven years ago","snow white and the seven dwarfs","i am at two with nature")
    val sentence = sentences(_rand.nextInt(sentences.length))
    _collector.emit(new Values(sentence))
  }

  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
    _collector = collector
    _rand = Random
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }

}

Next step is to implement a bolt which splits the sentences and emits them.

package com.gkatzioura.scala.storm

import java.text.BreakIterator

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

/**
  * Created by gkatzioura on 2/18/17.
  */
class SplitSentenceBolt extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val sentence = input.getString(0)
    val boundary = BreakIterator.getWordInstance

    boundary.setText(sentence)
    var start = boundary.first
    var end:Int = start

    while(end!=BreakIterator.DONE) {

      end = boundary.next
      val word = sentence.substring(start,end).replaceAll("\\s+","")
      start = end
      if(!word.equals("")) {
        collector.emit(new Values(word))
      }
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }
}

And the last step is the word count bolt.

package com.gkatzioura.scala.storm

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

/**
  * Created by gkatzioura on 2/18/17.
  */
class WordCountBolt extends BaseBasicBolt{

  val counts = scala.collection.mutable.Map[String,Int]()

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val word = input.getString(0)

    val optCount = counts.get(word)
    if(optCount.isEmpty) {
      counts.put(word,1)
    } else {
      counts.put(word,optCount.get+1)
    }

    collector.emit(new Values(word,counts))
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {

    declarer.declare(new Fields("word","count"));
  }

}

The final step is to create our topology which takes care whether we run locally or in a cluster environment.

package com.gkatzioura.scala.storm

import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields

/**
  * Created by gkatzioura on 2/18/17.
  */
object WordCountTopology  {

  def main(args: Array[String]): Unit = {
    println("Hello, world!")
    val builder = new TopologyBuilder
    builder.setSpout("spout", new RandomSentenceSpout, 5)
    builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout")
    builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word"))

    val conf = new Config()
    conf.setDebug(true)

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3)
      StormSubmitter.submitTopology(args(0), conf, builder.createTopology())
    }
    else {
      conf.setMaxTaskParallelism(3)
      val cluster = new LocalCluster
      cluster.submitTopology("word-count", conf, builder.createTopology())
      Thread.sleep(10000)
      cluster.shutdown()
    }
  }
}

Now we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Our sbt file is as follows

name := "ScalaStorm"

version := "1.0"

scalaVersion := "2.12.1"

scalacOptions += "-Yresolve-term-conflict:package"

libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"

And then we issue a build

sbt clean compile assembly

You can find the sourcecode on github.

On the next post we shall deploy our Storm app to HDInsight.

Advertisement

6 thoughts on “WordCount with Storm and Scala

  1. Hi,

    Thank You so much the code is very helpful for me.
    But i am getting error while compile through sbt.

    Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/storm/topology/IRichSpout

      1. Thank you so much.
        That issue has been resolved now the problem is somewhere else.

        May be it is not connecting with storm. I checked the logs but didn’t get any solution.

Leave a Reply to Chetan Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.