Apache Storm is a free and open source distributed realtime computation system running on the JVM.
To get started we will implement a very simple example. Previously we implemented a word count hadoop job using scala and we uploaded it to hdinsight.
We will focus on the same word count concept but for real time cases and implement a word count topology utilizing apache storm. Our source code will be based on the official storm examples.
Storm works with spouts and bolts.
First We shall implement a spout which will emit fake data events. In our case sentences.
package com.gkatzioura.scala.storm
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.{Fields, Values}
import org.apache.storm.utils.Utils
import scala.util.Random
/**
* Created by gkatzioura on 2/17/17.
*/
class RandomSentenceSpout extends BaseRichSpout {
var _collector:SpoutOutputCollector = _
var _rand:Random = _
override def nextTuple(): Unit = {
Utils.sleep(100)
val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away",
"four score and seven years ago","snow white and the seven dwarfs","i am at two with nature")
val sentence = sentences(_rand.nextInt(sentences.length))
_collector.emit(new Values(sentence))
}
override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
_collector = collector
_rand = Random
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("word"))
}
}
Next step is to implement a bolt which splits the sentences and emits them.
package com.gkatzioura.scala.storm
import java.text.BreakIterator
import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}
/**
* Created by gkatzioura on 2/18/17.
*/
class SplitSentenceBolt extends BaseBasicBolt {
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val sentence = input.getString(0)
val boundary = BreakIterator.getWordInstance
boundary.setText(sentence)
var start = boundary.first
var end:Int = start
while(end!=BreakIterator.DONE) {
end = boundary.next
val word = sentence.substring(start,end).replaceAll("\\s+","")
start = end
if(!word.equals("")) {
collector.emit(new Values(word))
}
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("word"))
}
}
And the last step is the word count bolt.
package com.gkatzioura.scala.storm
import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}
/**
* Created by gkatzioura on 2/18/17.
*/
class WordCountBolt extends BaseBasicBolt{
val counts = scala.collection.mutable.Map[String,Int]()
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val word = input.getString(0)
val optCount = counts.get(word)
if(optCount.isEmpty) {
counts.put(word,1)
} else {
counts.put(word,optCount.get+1)
}
collector.emit(new Values(word,counts))
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("word","count"));
}
}
The final step is to create our topology which takes care whether we run locally or in a cluster environment.
package com.gkatzioura.scala.storm
import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
/**
* Created by gkatzioura on 2/18/17.
*/
object WordCountTopology {
def main(args: Array[String]): Unit = {
println("Hello, world!")
val builder = new TopologyBuilder
builder.setSpout("spout", new RandomSentenceSpout, 5)
builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout")
builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word"))
val conf = new Config()
conf.setDebug(true)
if (args != null && args.length > 0) {
conf.setNumWorkers(3)
StormSubmitter.submitTopology(args(0), conf, builder.createTopology())
}
else {
conf.setMaxTaskParallelism(3)
val cluster = new LocalCluster
cluster.submitTopology("word-count", conf, builder.createTopology())
Thread.sleep(10000)
cluster.shutdown()
}
}
}
Now we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Our sbt file is as follows
name := "ScalaStorm"
version := "1.0"
scalaVersion := "2.12.1"
scalacOptions += "-Yresolve-term-conflict:package"
libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"
And then we issue a build
sbt clean compile assembly
You can find the sourcecode on github.
On the next post we shall deploy our Storm app to HDInsight.