Use JSON with Play and Scala

Once getting your hands into typing scala code using play, the first thing that comes to mind is JSON.
Without doubt JSON is one of the most basic components of web applications. Rest apis use json, your angular app has to consume json and the list goes on.

If you are lazy like me, you expect that it is sufficient to just pass back scala objects through your controller or specify a scala class as an argument to your controller. Somehow things don’t get far from that however some adjustments have to be done.

The first step is to specify the json module

libraryDependencies += json

The JSON library is pretty similar to the org.json library for java but with extra capabilities. The types we have out of the box are

JsString
JsNumber
JsBoolean
JsObject
JsArray
JsNull

However the key functionality comes from the Reads and Writes converters which can be used to marshal or unmarshal our data structures.

Suppose we have a class called User

case class User(id:Option[Long],email:String,firstName:String,lastName:String)

We want to use this class to pass data to our controllers or use it as a response, once our action has finished.

Thus we need to create a Reader and writer for the User object.

  implicit val userWrites = new Writes[User] {
    def writes(user: User) = Json.obj(
      "id" -> user.id,
      "email" -> user.email,
      "firstName" -> user.firstName,
      "lastName" -> user.lastName
    )
  }

  implicit val userReads: Reads[User] = (
    (__ \ "id").readNullable[Long] and
      (__ \ "email").read[String] and
      (__ \ "firstName").read[String] and
      (__ \ "lastName").read[String]
    )(User.apply _)

Most probably you’ve noticed that the id is optional. We do so in order to be able to either pass the id of the user or not.

Now let’s put them together in a controller.

package controllers

import javax.inject.Inject

import com.google.inject.Singleton
import play.api.libs.json._
import play.api.mvc.{Action, Controller}
import play.api.libs.functional.syntax._

/**
  * Created by gkatzioura on 4/26/17.
  */
case class User(id:Option[Long],email:String,firstName:String,lastName:String)

@Singleton
class UserController @Inject() extends Controller {

  def all = Action { implicit request =>
    val users = Seq(
      User(Option(1L),"gkazoura@example.com","Emmanouil","Gkatziouras"),
      User(Option(2L),"john@doe.com","John","Doe"),
      User(Option(3L),"john2@doe.com","John2","Doe2")
    )
    Ok(Json.toJson(users))
  }

  def greet = Action

  def add = Action { implicit request =>

    val user  = Json.fromJson[User](request.body.asJson.get).get
    val newUser = User(Option(4L),user.email,user.firstName,user.lastName)
    Ok(Json.toJson(newUser))
  }

  implicit val userWrites = new Writes[User] {
    def writes(user: User) = Json.obj(
      "id" -> user.id,
      "email" -> user.email,
      "firstName" -> user.firstName,
      "lastName" -> user.lastName
    )
  }

  implicit val userReads: Reads[User] = (
    (__ \ "id").readNullable[Long] and
      (__ \ "email").read[String] and
      (__ \ "firstName").read[String] and
      (__ \ "lastName").read[String]
    )(User.apply _)
}

And also the roots configuration

GET     /user/                   controllers.UserController.all
POST    /user/                   controllers.UserController.add

As we can see the all method returns a list of user objects in Json format while the add method is supposed to persist a user object and assign an id to it.

Let’s do a curl request and check our results

curl http://localhost:9000/user/

....

[{"id":1,"email":"gkazoura@example.com","firstName":"Emmanouil","lastName":"Gkatziouras"},{"id":2,"email":"john@doe.com","firstName":"John","lastName":"Doe"},{"id":3,"email":"john2@doe.com","firstName":"John2","lastName":"Doe2"}]

curl -H "Content-Type: application/json" -X POST -d '{"email":"emmanouil@egkatzioura.com","firstName":"Emmanouil","lastName":"Gkatziouras"}' http://localhost:9000/user/

...

{"id":4,"email":"emmanouil@egkatzioura.com","firstName":"Emmanouil","lastName":"Gkatziouras"}

So we didn’t get into any special json handling or reading instead we used only objects.
That’s it! Now your are ready for more JSON related action!

You can check the sourcecode on github.

Advertisement

Database Initialization with play and Scala

Once starting your play prototype application one of the priorities is to initialize your database and also manage the database schema changes.

Play provides us with evolutions. By utilizing evolutions we are able to create our database and to manage any futures changes to the schema.

To get started we need  to add the jdbc dependency and the evolutions dependency.

libraryDependencies += evolutions
libraryDependencies += jdbc

Then we shall use a simple h2 database persisted on disk, as our play application’s default database.
We edit the conf/application.conf file and add the following lines.

db.default.driver=org.h2.Driver
db.default.url="jdbc:h2:/tmp/defaultdatabase"

Pay extra attention that our database location is at the tmp directory thus all change shall be deleted once we reboot our workstation.

Once we have configured our database we are ready to create our first sql statement.
Our scripts should be located at the conf/evolutions/{your database name} directory, thus in our case
/conf/evolutions/default.

Our first script ‘1.sql’, shall create the users table.

# Users schema

# --- !Ups

CREATE TABLE users (
    id bigint(20) NOT NULL AUTO_INCREMENT,
    email varchar(255) NOT NULL,
    first_name varchar(255) NOT NULL,
    last_name varchar(255) NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY (email)
);

# --- !Downs

DROP TABLE users;

As we can see we got ups and downs. What do they stand for? As you have guessed ups describe the transformations while downs describe how to revert them.
So the next question would be, how this functionality comes in use?
Suppose you have two developers working on the 2.sql. Locally they have successfully migrated their database once they are done, however the merge result is far different than the file they executed on their database.
What evolutions do is detect if the file is different and reverts the old revision by applying downs and then applying the up to date revision.

Now we are all set to run our application.

sbt run

Once we navigate at localhost:9000 we shall be presented with a screen that forces us to run the evolutions detected.

Let us go one step further and see what has been done to our database schema. We can easily explore a h2 database using dbeaver or your ide .
By issuing show tables the results contain one extra table.


>SHOW TABLES;

TABLE_NAME,TABLE_SCHEMA
PLAY_EVOLUTIONS,PUBLIC
USERS,PUBLIC

The PLAY_EVOLUTIONS table keeps track of our changes

Id is the number of the evolution script that we created. The fields apply and revert are the ups and downs sql statements we created previously.
The field hash is used in order to detect changes to our file. In case of an evolution that has a different hash from the one applied the previous evolution is reverted and applies the new script.

For example let’s enhance our previous script and add one more field. The field username.

# Users schema

# --- !Ups

CREATE TABLE users (
    id bigint(20) NOT NULL AUTO_INCREMENT,
    email varchar(255) NOT NULL,
    username varchar(255) NOT NULL,
    first_name varchar(255) NOT NULL,
    last_name varchar(255) NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY (email)
);

# --- !Downs

DROP TABLE users;

Once we start our application we will be presented with a screen that forces us to issue an evolution for our different revision. If we hit apply the users table shall contain the username field.

So the process of a new revision is pretty straight forward.
The hash from the new 1.sql file is extracted. Then a query checks if the 1.sql file has already been applied. If it has been applied a check is issued in case the hashes are the same. If they are not then the downs script from the current database entry is executed. Once finished the new script is applied.

Your first Web application with Play and Scala

Today we are going to develop a simple play application using Scala.

To do so we must have sbt installed to our system.

Once installed we issue the command

sbt new playframework/play-scala-seed.g8

Then we are presented with an interactive terminal in order to pass valuable information.

name [play-scala-seed]: PlayStarter
organization [com.example]: com.gkatzioura
scala_version [2.11.8]: 
scalatestplusplay_version [2.0.0]: 
play_version [2.5.13]: 

Then let us check what we have just created

cd playstarter
sbt run

Navigate to http://localhost:9000 and you have a basic Play hello world.

By looking to our project structure, as expected, we have a directory with our controllers.
Consider our request being handled as an action. We issue a request and we receive an html view.

  def index = Action { implicit request =>
    Ok(views.html.index())
  }

As you can see we the html that is rendered is located at the views directory. Play comes with Twirl as a template engine.

At conf/routes we can see how the route is configured to the index action

Let’s add a simple action to that controller that returns a text body.

  def greet(name: String) = Action {
    Ok("Hello " + name)
  }

We have to edit the routes file to specify the new route and the get parameter

GET     /greet                      controllers.HomeController.greet(name)

Then issue a request at http://localhost:9000/greet?john

On the next step we shall add a new route with a path param

Suppose we want to retrieve the total logins for a user.
We implement an action that send a fake number

  def loginCount(userId: String) = Action {
    Ok(14)
  }

And then we register the route

GET     /user/:userId/login/count          controllers.HomeController.loginCount(userId)

By issuing the request http://localhost:9000/user/18/login/count
we shall receive the number 14.

To sum up we just implemented our first Play application. We also implemented some basic actions to our controller and achieved to pass some path and request parameters.

A journey with Scala

To those who are regular visitors of this blog, it is well known that when it comes to developing code I am a Spring/Java guy. Also I use different technologies like node or python but this depends largely on the project’s needs.

Due to some recent projects and courses involving Spark, stumbling on Scala was inevitable. After some investigation I decided to adopt it, as one of my main tools and there are many reasons for that.

From a Java Developers perspective

  • It evolves faster
  • It has Flexible syntax
  • It is static typed
  • It is pretty recent thus exciting, but in a JVM flavor

I remember back then when I was anticipating the release of Java 7. Lambdas, streams and put some functional programming into action. All those features that Scala provided. Unfortunately lambdas were dropped from Java 7, and released as part of Java 8. Thus it took 3 years to get your hands on lambdas 😦

The Scala syntax is great and increases productivity. I really fancy the fact that you can skip a lot of boilerplate that your had to deal with java. Let alone the options like tuples, switches and parameter name specification on function calls. The list could go on and on.

I used both dynamic and static typed languages however by developing mainly on Java I am a bit biased. I believe that static typing is a good choice because you can detect errors early, build reliable code and increase maintainability. Also it makes collaboration with others much easier.

Every new technology is exciting and feels like a new toy. However adopting a new technology comes with the lack of libraries and frameworks (node anyone?) that were essential for your development process. Fortunately you can always bet on the JVM and use your Java libraries with your Scala source code.

From a ecosystem perspective

If you come from Java EE or Spring MVC, you have an already prooved and tested framework for your web application.  Luckily Play comes to the rescue. Is Play sufficient for all your needs? I doubt. But frameworks evolve and since we live in a  microservices-architecture era having some Java components does the work.

From  a project perspective.

The cloud and the overall technological burst has brought a variety of different types of applications. Developing applications has become more challenging and some of them involve big data processing, streaming and machine learning. Scala is first place on this type of applications, thus if your objective is to work on the big data world mastering Scala will definitely assist you.

It does the filtering for you. As mentioned previously Spring/Java can be used for various type of projects, batch Applications, CMS apps, Rest apis etc. Scala has a more specific identity and targets certain type of projects. By searching for opportunities and contracts that include Scala, you already have a sense of the project’s nature and the challenges ahead.

At last

Talk is cheap! There are many tutorials ahead to author and document my experience. Stay tuned for more Scala content.

Run WordCount with Scala and Spark on HDInsight

Previously we tried to solve the word count problem with a Scala and Spark approach.
The next step is to deploy our solution to HDInsight using spark, hdfs, and scala

We shall provision a Sprak cluster.

screenshot-from-2017-02-22-23-12-22

Since we are going to use HDInsight we can utilize hdfs and therefore use the azure storage.

screenshot-from-2017-02-22-23-12-59

Then we choose our instance types.

screenshot-from-2017-02-22-23-13-21

And we are ready to create the Spark cluster.

screenshot-from-2017-02-22-23-13-55

Our data shall be uploaded to the hdfs file system
To do so we will upload our text files to the azure storage account which is integrated with hdfs.

For more information on managing a storage account with azure cli check the official guide. Any text file will work.

azure storage blob upload mytextfile.txt sparkclusterscala  example/data/mytextfile.txt

Since we use hdfs we shall make some changes to the original script

val text = sc.textFile("wasb:///example/data/mytextfile.txt")
val counts = text.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.collect

Then we can upload our scala class to the head node using ssh

scp WordCountscala.scala demon@{your cluster}-ssh.azurehdinsight.net:/home/demo/WordCountscala.scala

Again in order to run the script, things are pretty straightforward.

spark-shell -i WordCountscala.scala

And once the task is done we are presented with the spark prompt. Plus we can now save our results to the hdfs file system.

scala> counts.saveAsTextFile("/wordcount_results")

And do a quick check.

hdfs dfs -ls /wordcount_results/
hdfs dfs -text /wordcount_results/part-00000

WordCount with Sprak and Scala

Apache Spark has taken over the big data world. Spark is implemented with Scala and is well know for its performance.

In the previous blogs we approached the word count problem by using Scala with hadoop and Scala with storm.
On this blog we will utilize Spark for the word count problem.

Submitting spark jobs implemented with Scala is pretty easy and convenient. All we need is to submit our file as our input to the spark command.

First we have to download and setup a spark version locally.

Then will shall download a text file for testing. In my case the script from MGS2 did the work.

Now on to the WordCount script. For local testing we will use a file from our file system.

val text = sc.textFile("mytextfile.txt")
val counts = text.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.collect

Next step is to run the script

spark-shell -i WordCountscala.scala

Once finished a Spark command prompt will appear and we are free to do some experiments with the word count results

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

scala> res0.length
res1: Int = 20159

Thus we detected 20159 different words.

Our next step is to run our job to a spark cluster on HDInsight.

Run Scala implemented Storm topologies on HDInsight

Previously we set up a Scala implemented storm topology in order to count words.

What comes next is uploading our topology to HDInsight.

So we shall proceed in creating a Storm topology on HDInsight.

screenshot-from-2017-02-22-07-10-08

Then we choose the instance types.

screenshot-from-2017-02-22-07-32-37

Next step is to upload our jar file to the head node in order to deploy it. We can use scp for this purpose.

scp target/scala-2.12/ScalaStorm-assembly-1.0.jar  {your user}@{your azure endpoint}:/home/demo

Now we can ssh to our storm cluster’s head node and issue the storm command.

storm jar ScalaStorm-assembly-1.0.jar com.gkatzioura.scala.storm.WordCountTopology word-count-stream-scala

Then we can check our topology by navigating to https://{your cluster}.azurehdinsight.net/stormui

WordCount with Storm and Scala

Apache Storm is a free and open source distributed realtime computation system running on the JVM.
To get started we will implement a very simple example. Previously we implemented a word count hadoop job using scala and we uploaded it to hdinsight.
We will focus on the same word count concept but for real time cases and implement a word count topology utilizing apache storm. Our source code will be based on the official storm examples.

Storm works with spouts and bolts.

First We shall implement a spout which will emit fake data events. In our case sentences.

package com.gkatzioura.scala.storm


import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.{Fields, Values}
import org.apache.storm.utils.Utils

import scala.util.Random

/**
  * Created by gkatzioura on 2/17/17.
  */
class RandomSentenceSpout extends BaseRichSpout {

  var _collector:SpoutOutputCollector = _
  var _rand:Random = _

  override def nextTuple(): Unit = {

    Utils.sleep(100)

    val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away",
      "four score and seven years ago","snow white and the seven dwarfs","i am at two with nature")
    val sentence = sentences(_rand.nextInt(sentences.length))
    _collector.emit(new Values(sentence))
  }

  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
    _collector = collector
    _rand = Random
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }

}

Next step is to implement a bolt which splits the sentences and emits them.

package com.gkatzioura.scala.storm

import java.text.BreakIterator

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

/**
  * Created by gkatzioura on 2/18/17.
  */
class SplitSentenceBolt extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val sentence = input.getString(0)
    val boundary = BreakIterator.getWordInstance

    boundary.setText(sentence)
    var start = boundary.first
    var end:Int = start

    while(end!=BreakIterator.DONE) {

      end = boundary.next
      val word = sentence.substring(start,end).replaceAll("\\s+","")
      start = end
      if(!word.equals("")) {
        collector.emit(new Values(word))
      }
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }
}

And the last step is the word count bolt.

package com.gkatzioura.scala.storm

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

/**
  * Created by gkatzioura on 2/18/17.
  */
class WordCountBolt extends BaseBasicBolt{

  val counts = scala.collection.mutable.Map[String,Int]()

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val word = input.getString(0)

    val optCount = counts.get(word)
    if(optCount.isEmpty) {
      counts.put(word,1)
    } else {
      counts.put(word,optCount.get+1)
    }

    collector.emit(new Values(word,counts))
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {

    declarer.declare(new Fields("word","count"));
  }

}

The final step is to create our topology which takes care whether we run locally or in a cluster environment.

package com.gkatzioura.scala.storm

import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields

/**
  * Created by gkatzioura on 2/18/17.
  */
object WordCountTopology  {

  def main(args: Array[String]): Unit = {
    println("Hello, world!")
    val builder = new TopologyBuilder
    builder.setSpout("spout", new RandomSentenceSpout, 5)
    builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout")
    builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word"))

    val conf = new Config()
    conf.setDebug(true)

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3)
      StormSubmitter.submitTopology(args(0), conf, builder.createTopology())
    }
    else {
      conf.setMaxTaskParallelism(3)
      val cluster = new LocalCluster
      cluster.submitTopology("word-count", conf, builder.createTopology())
      Thread.sleep(10000)
      cluster.shutdown()
    }
  }
}

Now we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Our sbt file is as follows

name := "ScalaStorm"

version := "1.0"

scalaVersion := "2.12.1"

scalacOptions += "-Yresolve-term-conflict:package"

libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"

And then we issue a build

sbt clean compile assembly

You can find the sourcecode on github.

On the next post we shall deploy our Storm app to HDInsight.

Run Scala implemented Hadoop Jobs on HDInsight

Previously we set up a Scala application in order to execute a simple word count on hadoop.

What comes next is uploading our application to HDInsight.

So we shall proceed in creating a Hadoop cluster on HDInsight.

screenshot-from-2017-02-14-07-20-45

Then we will create the hadoop cluster.

screenshot-from-2017-02-16-07-55-42

As you can see we specify the admin console credentials and the ssh user to login to the head node.

Our hadoop cluster will be backed by an azure storage account.

screenshot-from-2017-02-16-07-57-07

Then it is time to upload our text files to the azure storage account.

For more information on managing a storage account with azure cli check the official guide. Any text file will work.

azure storage blob upload mytext.txt scalahadoopexample  example/data/input.txt

Now we can ssh to our Hadoop node.

First let’s run the examples that come packaged with the HInsight hadoop cluster.

hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar wordcount /example/data/input.txt /example/data/results

Check the results

hdfs dfs -text /example/data/results/part-r-00000

And then we are ready to scp the scala code to our hadoop node and issue as wordcount.

hadoop jar ScalaHadoop-assembly-1.0.jar /example/data/input.txt /example/data/results2

And again check the results

hdfs dfs -text /example/data/results2/part-r-00000

That’s it! HDinsight makes it pretty straight forward!

WordCount on Hadoop with Scala

Hadoop is a great technology built with java.

Today we will use Scala to implement a simple map reduce job and then run it using HDInsight.

We shall add the assembly plugin on our assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")


Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors.



assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"

We will use WordCount as an example.
The original Java class shall be transformed to a Scala class.

package com.gkatzioura.scala

import java.lang.Iterable
import java.util.StringTokenizer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._

/**
  * Created by gkatzioura on 2/14/17.
  */
package object WordCount {

  class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {

    val one = new IntWritable(1)
    val word = new Text()

    override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
      val itr = new StringTokenizer(value.toString)
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken())
        context.write(word, one)
      }
    }
  }

  class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
    override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
      var sum = values.asScala.foldLeft(0)(_ + _.get)
      context.write(key, new IntWritable(sum))
    }
  }


  def main(args: Array[String]): Unit = {
    val configuration = new Configuration
    val job = Job.getInstance(configuration,"word count")
    job.setJarByClass(this.getClass)
    job.setMapperClass(classOf[TokenizerMapper])
    job.setCombinerClass(classOf[IntSumReader])
    job.setReducerClass(classOf[IntSumReader])
    job.setOutputKeyClass(classOf[Text])
    job.setOutputKeyClass(classOf[Text]);
    job.setOutputValueClass(classOf[IntWritable]);
    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path(args(1)))
    System.exit(if(job.waitForCompletion(true))  0 else 1)
  }

}

Then we will build our example

sbt clean compile assembly

Our new jar will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar

On the next post we shall run our code using Azure’s HDInsight.

You can find the code on github.