WordCount with Sprak and Scala

Apache Spark has taken over the big data world. Spark is implemented with Scala and is well know for its performance.

In the previous blogs we approached the word count problem by using Scala with hadoop and Scala with storm.
On this blog we will utilize Spark for the word count problem.

Submitting spark jobs implemented with Scala is pretty easy and convenient. All we need is to submit our file as our input to the spark command.

First we have to download and setup a spark version locally.

Then will shall download a text file for testing. In my case the script from MGS2 did the work.

Now on to the WordCount script. For local testing we will use a file from our file system.

val text = sc.textFile("mytextfile.txt")
val counts = text.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.collect

Next step is to run the script

spark-shell -i WordCountscala.scala

Once finished a Spark command prompt will appear and we are free to do some experiments with the word count results

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

scala> res0.length
res1: Int = 20159

Thus we detected 20159 different words.

Our next step is to run our job to a spark cluster on HDInsight.

Run Scala implemented Storm topologies on HDInsight

Previously we set up a Scala implemented storm topology in order to count words.

What comes next is uploading our topology to HDInsight.

So we shall proceed in creating a Storm topology on HDInsight.

screenshot-from-2017-02-22-07-10-08

Then we choose the instance types.

screenshot-from-2017-02-22-07-32-37

Next step is to upload our jar file to the head node in order to deploy it. We can use scp for this purpose.

scp target/scala-2.12/ScalaStorm-assembly-1.0.jar  {your user}@{your azure endpoint}:/home/demo

Now we can ssh to our storm cluster’s head node and issue the storm command.

storm jar ScalaStorm-assembly-1.0.jar com.gkatzioura.scala.storm.WordCountTopology word-count-stream-scala

Then we can check our topology by navigating to https://{your cluster}.azurehdinsight.net/stormui

WordCount with Storm and Scala

Apache Storm is a free and open source distributed realtime computation system running on the JVM.
To get started we will implement a very simple example. Previously we implemented a word count hadoop job using scala and we uploaded it to hdinsight.
We will focus on the same word count concept but for real time cases and implement a word count topology utilizing apache storm. Our source code will be based on the official storm examples.

Storm works with spouts and bolts.

First We shall implement a spout which will emit fake data events. In our case sentences.

package com.gkatzioura.scala.storm


import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.{Fields, Values}
import org.apache.storm.utils.Utils

import scala.util.Random

/**
  * Created by gkatzioura on 2/17/17.
  */
class RandomSentenceSpout extends BaseRichSpout {

  var _collector:SpoutOutputCollector = _
  var _rand:Random = _

  override def nextTuple(): Unit = {

    Utils.sleep(100)

    val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away",
      "four score and seven years ago","snow white and the seven dwarfs","i am at two with nature")
    val sentence = sentences(_rand.nextInt(sentences.length))
    _collector.emit(new Values(sentence))
  }

  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
    _collector = collector
    _rand = Random
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }

}

Next step is to implement a bolt which splits the sentences and emits them.

package com.gkatzioura.scala.storm

import java.text.BreakIterator

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

/**
  * Created by gkatzioura on 2/18/17.
  */
class SplitSentenceBolt extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val sentence = input.getString(0)
    val boundary = BreakIterator.getWordInstance

    boundary.setText(sentence)
    var start = boundary.first
    var end:Int = start

    while(end!=BreakIterator.DONE) {

      end = boundary.next
      val word = sentence.substring(start,end).replaceAll("\\s+","")
      start = end
      if(!word.equals("")) {
        collector.emit(new Values(word))
      }
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }
}

And the last step is the word count bolt.

package com.gkatzioura.scala.storm

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

/**
  * Created by gkatzioura on 2/18/17.
  */
class WordCountBolt extends BaseBasicBolt{

  val counts = scala.collection.mutable.Map[String,Int]()

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val word = input.getString(0)

    val optCount = counts.get(word)
    if(optCount.isEmpty) {
      counts.put(word,1)
    } else {
      counts.put(word,optCount.get+1)
    }

    collector.emit(new Values(word,counts))
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {

    declarer.declare(new Fields("word","count"));
  }

}

The final step is to create our topology which takes care whether we run locally or in a cluster environment.

package com.gkatzioura.scala.storm

import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields

/**
  * Created by gkatzioura on 2/18/17.
  */
object WordCountTopology  {

  def main(args: Array[String]): Unit = {
    println("Hello, world!")
    val builder = new TopologyBuilder
    builder.setSpout("spout", new RandomSentenceSpout, 5)
    builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout")
    builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word"))

    val conf = new Config()
    conf.setDebug(true)

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3)
      StormSubmitter.submitTopology(args(0), conf, builder.createTopology())
    }
    else {
      conf.setMaxTaskParallelism(3)
      val cluster = new LocalCluster
      cluster.submitTopology("word-count", conf, builder.createTopology())
      Thread.sleep(10000)
      cluster.shutdown()
    }
  }
}

Now we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Our sbt file is as follows

name := "ScalaStorm"

version := "1.0"

scalaVersion := "2.12.1"

scalacOptions += "-Yresolve-term-conflict:package"

libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"

And then we issue a build

sbt clean compile assembly

You can find the sourcecode on github.

On the next post we shall deploy our Storm app to HDInsight.

Run Scala implemented Hadoop Jobs on HDInsight

Previously we set up a Scala application in order to execute a simple word count on hadoop.

What comes next is uploading our application to HDInsight.

So we shall proceed in creating a Hadoop cluster on HDInsight.

screenshot-from-2017-02-14-07-20-45

Then we will create the hadoop cluster.

screenshot-from-2017-02-16-07-55-42

As you can see we specify the admin console credentials and the ssh user to login to the head node.

Our hadoop cluster will be backed by an azure storage account.

screenshot-from-2017-02-16-07-57-07

Then it is time to upload our text files to the azure storage account.

For more information on managing a storage account with azure cli check the official guide. Any text file will work.

azure storage blob upload mytext.txt scalahadoopexample  example/data/input.txt

Now we can ssh to our Hadoop node.

First let’s run the examples that come packaged with the HInsight hadoop cluster.

hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar wordcount /example/data/input.txt /example/data/results

Check the results

hdfs dfs -text /example/data/results/part-r-00000

And then we are ready to scp the scala code to our hadoop node and issue as wordcount.

hadoop jar ScalaHadoop-assembly-1.0.jar /example/data/input.txt /example/data/results2

And again check the results

hdfs dfs -text /example/data/results2/part-r-00000

That’s it! HDinsight makes it pretty straight forward!

WordCount on Hadoop with Scala

Hadoop is a great technology built with java.

Today we will use Scala to implement a simple map reduce job and then run it using HDInsight.

We shall add the assembly plugin on our assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")


Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors.



assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"

We will use WordCount as an example.
The original Java class shall be transformed to a Scala class.

package com.gkatzioura.scala

import java.lang.Iterable
import java.util.StringTokenizer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._

/**
  * Created by gkatzioura on 2/14/17.
  */
package object WordCount {

  class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {

    val one = new IntWritable(1)
    val word = new Text()

    override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
      val itr = new StringTokenizer(value.toString)
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken())
        context.write(word, one)
      }
    }
  }

  class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
    override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
      var sum = values.asScala.foldLeft(0)(_ + _.get)
      context.write(key, new IntWritable(sum))
    }
  }


  def main(args: Array[String]): Unit = {
    val configuration = new Configuration
    val job = Job.getInstance(configuration,"word count")
    job.setJarByClass(this.getClass)
    job.setMapperClass(classOf[TokenizerMapper])
    job.setCombinerClass(classOf[IntSumReader])
    job.setReducerClass(classOf[IntSumReader])
    job.setOutputKeyClass(classOf[Text])
    job.setOutputKeyClass(classOf[Text]);
    job.setOutputValueClass(classOf[IntWritable]);
    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path(args(1)))
    System.exit(if(job.waitForCompletion(true))  0 else 1)
  }

}

Then we will build our example

sbt clean compile assembly

Our new jar will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar

On the next post we shall run our code using Azure’s HDInsight.

You can find the code on github.

Hibernate Caching with HazelCast: Basic configuration

Previously we went through an introduction on JPA caching, the mechanisms and what hibernate offers.

What comes next is a hibernate project using Hazelcast as a second level cache.

We will use a basic spring boot project for this purpose with JPA. Spring boot uses hibernate as the default JPA provider.
Our setup will be pretty close to the one of a previous post.
We will use postgresql with docker for our sql database.

group 'com.gkatzioura'
version '1.0-SNAPSHOT'

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'


repositories {
    mavenCentral()
}

dependencies {
    compile("org.springframework.boot:spring-boot-starter-web")
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-jpa'
    compile group: 'org.postgresql', name:'postgresql', version:'9.4-1206-jdbc42'
    compile group: 'org.springframework', name: 'spring-jdbc'
    compile group: 'com.zaxxer', name: 'HikariCP', version: '2.6.0'
    compile group: 'com.hazelcast', name: 'hazelcast-hibernate5', version: '1.2'
    compile group: 'com.hazelcast', name: 'hazelcast', version: '3.7.5'
    testCompile group: 'junit', name: 'junit', version: '4.11'
}

By examining the dependencies carefully we see the hikari pool, the postgresql driver, spring data jpa and of course hazelcast.

Instead of creating the database manually we will automate it by utilizing the database initialization feature of Spring boot.

We shall create a file called schema.sql under the resources folder.

create schema spring_data_jpa_example;
 
create table spring_data_jpa_example.employee(
    id  SERIAL PRIMARY KEY,
    firstname   TEXT    NOT NULL,
    lastname    TEXT    NOT NULL,   
    email       TEXT    not null,
    age         INT     NOT NULL,
    salary         real,
    unique(email)
);
 
insert into spring_data_jpa_example.employee (firstname,lastname,email,age,salary) 
values ('Test','Me','test@me.com',18,3000.23);

To keep it simple and avoid any further configurations we shall put the configurations for datasource, jpa and caching inside the application.yml file.

spring:
  datasource:
    continue-on-error: true
    type: com.zaxxer.hikari.HikariDataSource
    url: jdbc:postgresql://172.17.0.2:5432/postgres
    driver-class-name: org.postgresql.Driver
    username: postgres
    password: postgres
    hikari:
      idle-timeout: 10000
  jpa:
    properties:
      hibernate:
        cache:
          use_second_level_cache: true
          use_query_cache: true
          region:
            factory_class: com.hazelcast.hibernate.HazelcastCacheRegionFactory
    show-sql: true

The configuration spring.datasource.continue-on-error is crucial since once the application relaunches, there should be a second attempt to create the database and thus a crash is inevitable.

Any hibernate specific properties reside at the spring.jpa.properties path. We enabled the second level cache and the query cache.

Also we set show-sql to true. This means that once a query hits the database it shall be logged through the console.

Then create our employee entity.

package com.gkatzioura.hibernate.enitites;

import javax.persistence.*;

/**
 * Created by gkatzioura on 2/6/17.
 */
@Entity
@Table(name = "employee", schema="spring_data_jpa_example")
public class Employee {

    @Id
    @Column(name = "id")
    @GeneratedValue(strategy = GenerationType.SEQUENCE)
    private Long id;

    @Column(name = "firstname")
    private String firstName;

    @Column(name = "lastname")
    private String lastname;

    @Column(name = "email")
    private String email;

    @Column(name = "age")
    private Integer age;

    @Column(name = "salary")
    private Integer salary;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastname() {
        return lastname;
    }

    public void setLastname(String lastname) {
        this.lastname = lastname;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public Integer getSalary() {
        return salary;
    }

    public void setSalary(Integer salary) {
        this.salary = salary;
    }
}

Everything is setup. Spring boot will detect the entity and create an EntityManagerFactory on its own.
What comes next is the repository class for employee.

package com.gkatzioura.hibernate.repository;

import com.gkatzioura.hibernate.enitites.Employee;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.CrudRepository;

/**
 * Created by gkatzioura on 2/11/17.
 */
public interface EmployeeRepository extends JpaRepository<Employee,Long> {
}

And the last one is the controller

package com.gkatzioura.hibernate.controller;

import com.gkatzioura.hibernate.enitites.Employee;
import com.gkatzioura.hibernate.repository.EmployeeRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * Created by gkatzioura on 2/6/17.
 */
@RestController
public class EmployeeController {

    @Autowired
    private EmployeeRepository employeeRepository;

    @RequestMapping("/employee")
    public List<Employee> testIt() {

        return employeeRepository.findAll();
    }

    @RequestMapping("/employee/{employeeId}")
    public Employee getEmployee(@PathVariable Long employeeId) {

        return employeeRepository.findOne(employeeId);
    }

}

Once we issue a request at
http://localhost:8080/employee/1

Console will display the query issued at the database

Hibernate: select employee0_.id as id1_0_0_, employee0_.age as age2_0_0_, employee0_.email as email3_0_0_, employee0_.firstname as firstnam4_0_0_, employee0_.lastname as lastname5_0_0_, employee0_.salary as salary6_0_0_ from spring_data_jpa_example.employee employee0_ where employee0_.id=?

The second time we issue the request, since we have the second cache enabled there won’t be a query issued upon the database. Instead the entity shall be fetched from the second level cache.

You can download the project from github.

Hibernate Caching With HazelCast: JPA caching basics

One of the greatest capabilities of HazelCast is the support for hibernate’s second level cache.

JPA has two levels of cache.
The first level cache caches an object’s state for the duration of a transaction. By querying the same object twice you have to get the object your retrieved the first time.
However in case of complex queries which include the object you retrieved and access your database, chances are, that the results would be out of sync since they will not reflect the changes you applied to the object in memory during the transaction. However you can tackle this with flush().
Once a JPA session is initiated its first level cache is restricted to that session, it will not affect other sessions.
First level cache is required as a part of JPA

The second level cache on the contrary to first level cache is associated with the Session Factory, thus the second level cache is shared across sessions. Commonly used data can be stored in memory and retrieved faster.

Once you have the second level cache enabled hibernate will cache the entities retrieved in a hibernate region. To do so you have to set your entities as cachable. Under the hood the information that resides in an entity is cached in a dehydrated format.

Hazelcast can be used with second level cache in two forms of architectures.
Client-server or cluster-only architecture.
For start we will investigate a cluster only architecture.
Hazelcast creates a separate distributed map for each Hibernate cache region therefore an entity. You can easily configure these regions via Hazelcast map configuration.The name of the region has a corresponding hazelcast map. For example one of our entities is called User and the full package path is ‘com.gkatzioura.User’ then our hazelcast will have the name ‘com.gkatzioura.User’. Provided that this map is distributed across hazelcast nodes, once the entity is retrieved from one node the cached information will be shared to other hazelcast nodes. Once an entity gets updated in a node the cached information will be invalidated in the other nodes.

Hibernate also provides us with Query cache. Query cache is a cache that caches query results . For example in case of a jpql query

SELECT usr.username,usr.firstname FROM User usr

the cached result would be a map with a key composed of the query and the parameters

and the value the results retrieved. In the previous cache the data retrieved are primitive values and they are stored as it is.
However there cases in which a query might retrieve entities.
For example

SELECT c FROM Customer c

In such cases instead of storing all the information retrieved, the entities are retrieved and cached in the second level cache whilst the query cache has an entry using the query and its parameters as a key and the entity ids as the value.
Once the same query is issued again the query cache will fetch the ids and will lookup on the second level cache for the corresponding entities. If an entity does not exist in the second level cache, then a query is issued in order to fetch the entity missing.
When it comes to second level cache and query cache configuration we need to pay attention on the eviction mechanisms of the second level cache and query cache. You might stumble in cases of ids being cached in the query cache however their corresponding entities are evicted from the second level cache. In such cases there is a performance hit since hibernate will issue a query for each entity missing.

Hazelcast has support for the query cache, however it is local to the node and never distributed across hazelcast cluster.
Although the results fetched from a query remain on the specific node, the entities specified from the cached query shall be retrieved from the distributed map which is used as a second level cache.

This is the theory we need so far. On the next blog we do some spring data jpa code and some hazelcast configurations.

Push Spring Boot Docker images on ECR

On a previous blog we integrated a spring boot application with EC2.
It is one of the most raw forms of deployment that you can have on Amazon Web Services.

On this tutorial we will create a docker image with our application which will be stored to the Amazon EC2 container registry.

You need to have the aws cli tool installed.

We will get as simple as we can with our spring application therefore we will use an example from the official spring source page. The only changes applied will be on the packaging and the application name.

Our application shall be named ecs-deployment

rootProject.name = 'ecs-deployment'

Then we build and run our application

gradle build
gradle bootRun

Now let’s dockerize our application.
First we shall create a Dockerfile that will reside on src/main/docker.

FROM frolvlad/alpine-oraclejdk8
VOLUME /tmp
ADD ecs-deployment-1.0-SNAPSHOT.jar app.jar
RUN sh -c 'touch /app.jar'
ENV JAVA_OPTS=""
ENTRYPOINT [ "sh", "-c", "java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar" ]

Then we should edit our gradle file in order to add the docker dependency, the docker plugin and an extra gradle task in order to create our docker image.

buildscript {
    ...
    dependencies {
        ...
        classpath('se.transmode.gradle:gradle-docker:1.2')
    }
}

...
apply plugin: 'docker'


task buildDocker(type: Docker, dependsOn: build) {
    push = false
    applicationName = jar.baseName
    dockerfile = file('src/main/docker/Dockerfile')
}

And we are ready to build our docker image.

./gradlew build buildDocker

You can also run your docker application from the newly created image.

docker run -p 8080:8080 -t com.gkatzioura.deployment/ecs-deployment:1.0-SNAPSHOT

First step is too create our ecr repository

aws ecr create-repository  --repository-name ecs-deployment

Then let us proceed with our docker registry authentication.

aws ecr get-login

Then run the command given in the output. The login attempt will succeed and your are ready to proceed to push your image.

First tag the image in order to specify the repository that we previously created and then do a docker push.

docker tag {imageid} {aws account id}.dkr.ecr.{aws region}.amazonaws.com/ecs-deployment:1.0-SNAPSHOT
docker push {aws account id}.dkr.ecr.{aws region}.amazonaws.com/ecs-deployment:1.0-SNAPSHOT

And we are done! Our spring boot docker image is deployed on the Amazon EC2 container registry.

You can find the source code on github.

Spring-Boot and Cache Abstraction with HazelCast

Previously we got started with Spring Cache abstraction using the default Cache Manager that spring provides.

Although this approach might suit our needs for simple applications, in case of complex problems we need to use different tools with more capabilities. Hazelcast is one of them. Hazelcast is hands down a great caching tool when it comes to a JVM based application. By using hazelcast as a cache, data is evenly distributed among the nodes of a computer cluster, allowing for horizontal scaling of available storage.

We will run our codebase using spring profiles thus ‘hazelcast-cache’ will be our profile name.

group 'com.gkatzioura'
version '1.0-SNAPSHOT'


buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

repositories {
    mavenCentral()
}


sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter-web")
    compile("org.springframework.boot:spring-boot-starter-cache")
    compile("org.springframework.boot:spring-boot-starter")
    compile("com.hazelcast:hazelcast:3.7.4")
    compile("com.hazelcast:hazelcast-spring:3.7.4")

    testCompile("junit:junit")
}

bootRun {
    systemProperty "spring.profiles.active", "hazelcast-cache"
}

As you can see we updated the gradle file from the previous example and we added two extra dependencies hazelcast and hazelcast-spring. Also we changed the profile that our application will run by default.

Our next step is to configure the hazelcast cache manager.

package com.gkatzioura.caching.config;

import com.hazelcast.config.Config;
import com.hazelcast.config.EvictionPolicy;
import com.hazelcast.config.MapConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

/**
 * Created by gkatzioura on 1/10/17.
 */
@Configuration
@Profile("hazelcast-cache")
public class HazelcastCacheConfig {

    @Bean
    public Config hazelCastConfig() {

        Config config = new Config();
        config.setInstanceName("hazelcast-cache");

        MapConfig allUsersCache = new MapConfig();
        allUsersCache.setTimeToLiveSeconds(20);
        allUsersCache.setEvictionPolicy(EvictionPolicy.LFU);
        config.getMapConfigs().put("alluserscache",allUsersCache);

        MapConfig usercache = new MapConfig();
        usercache.setTimeToLiveSeconds(20);
        usercache.setEvictionPolicy(EvictionPolicy.LFU);
        config.getMapConfigs().put("usercache",usercache);

        return config;
    }

}

We just created two maps with a ttl policy of 20 seconds. Therefore 20 seconds since the map gets populated a cache eviction will occur. For more hazelcast configurations please refer to the official hazelcast documentation.

Another change that we have to implement is to change UserPayload into a serializable Java object, since objects stored in hazelcast must be Serializable.

package com.gkatzioura.caching.model;

import java.io.Serializable;

/**
 * Created by gkatzioura on 1/5/17.
 */
public class UserPayload implements Serializable {

    private String userName;
    private String firstName;
    private String lastName;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

Last but not least we add another repository bound to the hazelcast-cache profile.

The result is our previous spring-boot application integrated with hazelcast instead of the default cache, configured with a ttl policy.

You can find the sourcecode on github.

Spring boot and Cache Abstraction

Caching is a major ingredient of most applications, and as long as we try to avoid disk access it will stay strong.
Spring has great support for caching with a wide range of configurations. You can start as simple as you want and progress to something much more customizable.

This would be an example with the simplest form of caching that spring provides.
Spring comes by default with an in memory cache which is pretty easy to setup.

Let us start with our gradle file.

group 'com.gkatzioura'
version '1.0-SNAPSHOT'


buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

repositories {
    mavenCentral()
}


sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter-web")
    compile("org.springframework.boot:spring-boot-starter-cache")
    compile("org.springframework.boot:spring-boot-starter")
    testCompile("junit:junit")
}

bootRun {
    systemProperty "spring.profiles.active", "simple-cache"
}

Since the same project will be used for different cache providers there are gonna be multiple spring profiles. The spring profile for this tutorial would be the simple-cache since we are going to use the ConcurrentMap-based Cache which happens to be the default.

We will implement an application which will fetch user information from our local file system.
The information shall reside on the users.json file

[
  {"userName":"user1","firstName":"User1","lastName":"First"},
  {"userName":"user2","firstName":"User2","lastName":"Second"},
  {"userName":"user3","firstName":"User3","lastName":"Third"},
  {"userName":"user4","firstName":"User4","lastName":"Fourth"}
]

Also we will specify a simple model for the data to be retrieved.

package com.gkatzioura.caching.model;

/**
 * Created by gkatzioura on 1/5/17.
 */
public class UserPayload {

    private String userName;
    private String firstName;
    private String lastName;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

Then we will add a bean that will read the information.

package com.gkatzioura.caching.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.gkatzioura.caching.model.UserPayload;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.io.Resource;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
 * Created by gkatzioura on 1/5/17.
 */
@Configuration
@Profile("simple-cache")
public class SimpleDataConfig {

    @Autowired
    private ObjectMapper objectMapper;

    @Value("classpath:/users.json")
    private Resource usersJsonResource;

    @Bean
    public List<UserPayload> payloadUsers() throws IOException {

        try(InputStream inputStream = usersJsonResource.getInputStream()) {

            UserPayload[] payloadUsers = objectMapper.readValue(inputStream,UserPayload[].class);
            return Collections.unmodifiableList(Arrays.asList(payloadUsers));
        }
    }
}

Obviously in order to access the information we will use the bean instantiated containing all the user information.

Next step will be to create a repository interface to specify the methods that will be used.

package com.gkatzioura.caching.repository;

import com.gkatzioura.caching.model.UserPayload;

import java.util.List;

/**
 * Created by gkatzioura on 1/6/17.
 */
public interface UserRepository {

    List<UserPayload> fetchAllUsers();

    UserPayload firstUser();

    UserPayload userByFirstNameAndLastName(String firstName,String lastName);

}

Now let’s dive into the implementation which will contain the cache annotations needed.

package com.gkatzioura.caching.repository;

import com.gkatzioura.caching.model.UserPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.Optional;

/**
 * Created by gkatzioura on 12/30/16.
 */
@Repository
@Profile("simple-cache")
public class UserRepositoryLocal implements UserRepository {

    @Autowired
    private List<UserPayload> payloadUsers;

    private static final Logger LOGGER = LoggerFactory.getLogger(UserRepositoryLocal.class);

    @Override
    @Cacheable("alluserscache")
    public List<UserPayload> fetchAllUsers() {

        LOGGER.info("Fetching all users");

        return payloadUsers;
    }

    @Override
    @Cacheable(cacheNames = "usercache",key = "#root.methodName")
    public UserPayload firstUser() {

        LOGGER.info("fetching firstUser");

        return payloadUsers.get(0);
    }

    @Override
    @Cacheable(cacheNames = "usercache",key = "{#firstName,#lastName}")
    public UserPayload userByFirstNameAndLastName(String firstName,String lastName) {

        LOGGER.info("fetching user by firstname and lastname");

        Optional<UserPayload> user = payloadUsers.stream().filter(
                p-> p.getFirstName().equals(firstName)
                &&p.getLastName().equals(lastName))
                .findFirst();

        if(user.isPresent()) {
            return user.get();
        } else {
            return null;
        }
    }

}

Methods that contain the @Cacheable will trigger cache population contrary to methods that contain @CacheEvict which trigger cache eviction.
By using @Cacheable instead of just specifying the cache map that our values will be stored, we can proceed into specifying also keys based on the method name or the method arguments. Thus we achieve method caching.
For example the method firstUser, uses as a key the method name whilst the method userByFirstNameAndLastName uses the method arguments in order to create a key.

Two methods with the @CacheEvict annotation will empty the caches specified.

LocalCacheEvict will be the component that will handler the eviction.

package com.gkatzioura.caching.repository;

import org.springframework.cache.annotation.CacheEvict;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

/**
 * Created by gkatzioura on 1/7/17.
 */
@Component
@Profile("simple-cache")
public class LocalCacheEvict {

    @CacheEvict(cacheNames = "alluserscache",allEntries = true)
    public void evictAllUsersCache() {

    }

    @CacheEvict(cacheNames = "usercache",allEntries = true)
    public void evictUserCache() {

    }

}

Since we use a very simple form of cacheh ttl eviction is not supported. Therefore we will add a scheduler only for this particular case which will evict the cache after a certain period of time.

package com.gkatzioura.caching.scheduler;

import com.gkatzioura.caching.repository.LocalCacheEvict;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * Created by gkatzioura on 1/7/17.
 */
@Component
@Profile("simple-cache")
public class EvictScheduler {

    @Autowired
    private LocalCacheEvict localCacheEvict;

    private static final Logger LOGGER = LoggerFactory.getLogger(EvictScheduler.class);

    @Scheduled(fixedDelay=10000)
    public void clearCaches() {

        LOGGER.info("Invalidating caches");

        localCacheEvict.evictUserCache();
        localCacheEvict.evictAllUsersCache();
    }


}

To wrap up we will use a controller to call the methods specified

package com.gkatzioura.caching.controller;

import com.gkatzioura.caching.model.UserPayload;
import com.gkatzioura.caching.repository.UserRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * Created by gkatzioura on 12/30/16.
 */
@RestController
public class UsersController {

    @Autowired
    private UserRepository userRepository;

    @RequestMapping(path = "/users/all",method = RequestMethod.GET)
    public List<UserPayload> fetchUsers() {

        return userRepository.fetchAllUsers();
    }

    @RequestMapping(path = "/users/first",method = RequestMethod.GET)
    public UserPayload fetchFirst() {
        return userRepository.firstUser();
    }

    @RequestMapping(path = "/users/",method = RequestMethod.GET)
    public UserPayload findByFirstNameLastName(String firstName,String lastName ) {

        return userRepository.userByFirstNameAndLastName(firstName,lastName);
    }

}

Last but not least our Application class should contain two extra annotations. @EnableScheduling is needed in order to enable schedulers and @EnableCaching in order to enable caching

package com.gkatzioura.caching;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * Created by gkatzioura on 12/30/16.
 */
@SpringBootApplication
@EnableScheduling
@EnableCaching
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }

}

You can find the sourcecode on github.