Query DynamoDB Items with DynamoDBMapper

On a previous post we issued queries on a DynamoDB database using the low level java api.

Querying using the DynamoDBMapper is pretty easy.

Issue a query using a hash key is as simple as it gets. The best candidate for a query like this would be the Users table by searching using the email hash key.

    public User getUser(String email) {

        User user = dynamoDBMapper.load(User.class,email);
        return user;
    }

Since we use only hashkey for the Users table, our result would be limited to one.

The load function can also be used for composite keys. Therefore querying for a Logins Table Item would require a hash key and a range key.

    public Login getLogin(String email,Long date) {

        Login login =  dynamoDBMapper.load(Login.class,email,date);
        return login;
    }

Next step is to issue more complex queries using conditions. We will issue a query that will fetch the login attempts between two dates.


 public List<Login> queryLoginsBetween(String email, Long from, Long to) {

        Map<String,String> expressionAttributesNames = new HashMap<>();
        expressionAttributesNames.put("#email","email");
        expressionAttributesNames.put("#timestamp","timestamp");

        Map<String,AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":emailValue",new AttributeValue().withS(email));
        expressionAttributeValues.put(":from",new AttributeValue().withN(Long.toString(from)));
        expressionAttributeValues.put(":to",new AttributeValue().withN(Long.toString(to)));

        DynamoDBQueryExpression<Login> queryExpression = new DynamoDBQueryExpression<Login>()
                .withKeyConditionExpression("#email = :emailValue and #timestamp BETWEEN :from AND :to ")
                .withExpressionAttributeNames(expressionAttributesNames)
                .withExpressionAttributeValues(expressionAttributeValues);

        return dynamoDBMapper.query(Login.class,queryExpression);
    }

We use DynamoDBQueryExpression, in the same manner that we used it in the low level api.
The main difference is that we do not have to handle the paging at all. DynamoDBMapper will map the DynamoDB items to objects but also it will return a “lazy-loaded” collection. It initially returns only one page of results, and then makes a service call for the next page if needed.

Last but not least querying on indexes is one of the basic actions. It is the same routine either for local or global secondary indexes.
Keep in mind that the results fetched, depend on the projection type we specified once creating the Table. In our case the projection type is for all fields.

   public Supervisor getSupervisor(String company,String factory) {

        Map<String,String> expressionAttributesNames = new HashMap<>();
        expressionAttributesNames.put("#company","company");
        expressionAttributesNames.put("#factory","factory");

        Map<String,AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":company",new AttributeValue().withS(company));
        expressionAttributeValues.put(":factory",new AttributeValue().withS(factory));

        DynamoDBQueryExpression<Supervisor> dynamoDBQueryExpression = new DynamoDBQueryExpression<Supervisor>()
                .withIndexName("FactoryIndex")
                .withKeyConditionExpression("#company = :company and #factory = :factory ")
                .withExpressionAttributeNames(expressionAttributesNames)
                .withExpressionAttributeValues(expressionAttributeValues)
                .withConsistentRead(false);

        List<Supervisor> supervisor = dynamoDBMapper.query(Supervisor.class,dynamoDBQueryExpression);

        if(supervisor.size()>0) {
            return supervisor.get(0);
        } else {
            return null;
        }
    }

Pay extra attention to the fact that consistent read is set to false. DynamoDBQueryExpression uses by defaut consistent reads. When using a global secondary index you cannot issue a consistent read.

You can find full source code with unit tests on github.

Advertisement

Spring boot with Spring Security and NoSQL

In the previous post we set up a spring security configuration by providing custom queries for user and authority retrieval from an sql database.

Nowadays many modern applications utilize NoSQL databases. Spring security does not come with an out of the box solution for NoSQL databases.

In those cases we need to provide a solution by Implementing a Custom UserDetailsService.

We will use a MongoDB Database for this example.
I will use a docker image, however it is as easy to set up a mongodb database by downloading it from the official website.

Those are some commands to get started with docker and mongodb (feel free to ignore them if you don’t use docker)

#pull the mongo image
docker pull mongo
#create a mongo container
docker run --name some-mongo -d mongo
#get the docker container id
docker ps
#get the containers ip
docker inspect --format '{{ .NetworkSettings.IPAddress }}' $CID
#connection using the ip retrieved
mongo $mongodb_container_ip

Then we will write a simple initialization script called createuser.js. The script creates an document containing user information such as username password and authorities.

use springsecurity
db.users.insert({"name":"John","surname":"doe","email":"john@doe.com","password":"cleartextpass","authorities":["user","admin"]})

We will use mongo cli to execute it.

mongo 172.17.0.2:27017 < createuser.js

In order to use spring security with mongodb we need to retrieve the user information from the users collection.

First step is to add the mongodb dependencies to our gradle file, including the mongodb driver. Note that we will use a profile called ‘customuserdetails’.

group 'com.gkatzioura'
version '1.0-SNAPSHOT'

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.0.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'spring-boot'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile("org.springframework.boot:spring-boot-starter-web")
    compile("org.thymeleaf:thymeleaf-spring4")
    compile("org.springframework.boot:spring-boot-starter-security")
    compile("org.mongodb:mongo-java-driver:1.3")
    compile("org.slf4j:slf4j-api:1.6.6")
    compile("ch.qos.logback:logback-core:1.1.7")
    compile("ch.qos.logback:logback-classic:1.1.7")
    testCompile "junit:junit:4.11"
}

bootRun {
    systemProperty "spring.profiles.active", "customuserdetails"
}

Then we shall create a mongodb connection bean.

package com.gkatzioura.spring.security.config;

import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

/**
 * Created by gkatzioura on 9/27/16.
 */
@Configuration
@Profile("customuserdetails")
public class MongoConfiguration {

    @Bean
    public MongoClient createConnection() {

        //You should put your mongo ip here
        return new MongoClient("172.17.0.2:27017");
    }
}

Then we will create a custom user details object.

package com.gkatzioura.spring.security.model;

import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.AuthorityUtils;
import org.springframework.security.core.userdetails.UserDetails;

import java.util.Collection;
import java.util.List;

/**
 * Created by gkatzioura on 9/27/16.
 */
public class MongoUserDetails  implements UserDetails{

    private String username;
    private String password;
    private List<GrantedAuthority> grantedAuthorities;
    
    public MongoUserDetails(String username,String password,String[] authorities) {
        this.username = username;
        this.password = password;
        this.grantedAuthorities = AuthorityUtils.createAuthorityList(authorities);
    }
    
    @Override
    public Collection<? extends GrantedAuthority> getAuthorities() {
        return grantedAuthorities;
    }

    @Override
    public String getPassword() {
        return password;
    }

    @Override
    public String getUsername() {
        return username;
    }

    @Override
    public boolean isAccountNonExpired() {
        return true;
    }

    @Override
    public boolean isAccountNonLocked() {
        return true;
    }

    @Override
    public boolean isCredentialsNonExpired() {
        return true;
    }

    @Override
    public boolean isEnabled() {
        return true;
    }
}

Next step we will add a custom UserDetailsService retrieving user details through the mongodb database.

package com.gkatzioura.spring.security.service;

import com.gkatzioura.spring.security.model.MongoUserDetails;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * Created by gkatzioura on 9/27/16.
 */
public class CustomerUserDetailsService implements UserDetailsService {

    @Autowired
    private MongoClient mongoClient;

    @Override
    public UserDetails loadUserByUsername(String email) throws UsernameNotFoundException {

        MongoDatabase database = mongoClient.getDatabase("springsecurity");
        MongoCollection<Document> collection = database.getCollection("users");

        Document document = collection.find(Filters.eq("email",email)).first();

        if(document!=null) {

            String name = document.getString("name");
            String surname = document.getString("surname");
            String password = document.getString("password");
            List<String> authorities = (List<String>) document.get("authorities");

            MongoUserDetails mongoUserDetails = new MongoUserDetails(email,password,authorities.toArray(new String[authorities.size()]));

            return mongoUserDetails;
        } else {

           throw new UsernameNotFoundException("username not found");
        }
    }

}

Final step is to provide a spring security configuration using the custom UserDetailsService we implemented previously.

package com.gkatzioura.spring.security.config;

import com.gkatzioura.spring.security.service.CustomerUserDetailsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.core.userdetails.UserDetailsService;

/**
 * Created by gkatzioura on 9/27/16.
 */
@EnableWebSecurity
@Profile("customuserdetails")
public class CustomUserDetailsSecurityConfig extends WebSecurityConfigurerAdapter {

    @Bean
    public UserDetailsService mongoUserDetails() {
        return new CustomerUserDetailsService();
    }

    @Override
    protected void configure(AuthenticationManagerBuilder auth) throws Exception {

        UserDetailsService userDetailsService = mongoUserDetails();
        auth.userDetailsService(userDetailsService);
    }

    @Override
    protected void configure(HttpSecurity http) throws Exception {

        http.authorizeRequests()
                .antMatchers("/public").permitAll()
                .anyRequest().authenticated()
                .and()
                .formLogin()
                .permitAll()
                .and()
                .logout()
                .permitAll();
    }

}

To run the application issue

gradle bootRun

You can find the source code on github

Testing Amazon Web Services Codebase: DynamoDB and S3

When switching to an amazon web services infrastructure, one of the main challenges is testing.

Components such as DynamoDB and S3 come in handy however they come with a cost.
When it comes to continuous integration you will end up spending resources if you use the amazon components.

Some of these components have their clones that are capable of running locally.

You can use DynamoDB locally.

By issuing

java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb

you will have a local DynamoDB instance up and running.

Also on http://localhost:8000/shell you have a DynamoDB Shell (based on javascript) which will help you to get started.

In order to connect to the local instance you need to set the endpoint on your DynamoDB client.

On Java

AmazonDynamoDBClient client = new AmazonDynamoDBClient();
client.setEndpoint("http://localhost:8000"); 

On Node.js

var AWS = require('aws-sdk');
var config = {"endpoint":"http://localhost:8000"};
var client = new AWS.DynamoDB(config);

Another base component of Amazon Web Services is the Simple Storage Service (S3).

Luckily we have fake-s3 . Fake-S3 a lightweight server clone of amazon S3, exists.

Installing and running fake-s3 is pretty simple

gem install fakes3
fakes3 -r /mnt/fakes3_root -p 4567

In order to connect you have to specify the endpoint

On Java

AmazonS3 client = new AmazonS3Client();
client.setEndpoint("http://localhost:8000"); 

On Node.js

var AWS = require('aws-sdk');
var config = {"endpoint":"http://localhost:8000"};
var client = new AWS.S3(config);

These tools will come in handy during the development face, especially when you get started and want a simple example. By running them locally you avoid overhead of permissions and configurations that come with each component you upload on amazon.

Use Map Reduce for Tf-Idf ranking on a Node.js and MongoDB environment

When developing a document search application one of the challenges is to order your results according to the occurrence of the term that you search for. Tf-Idf is a numerical statistic that assists you in weighing the results of you search.
Tf stands for term frequency.
Idf stands for Inverse document frequency.

To get a grasp we will develop a sample of tf-idf in javascript, as a node module.

function TfIdf() {
}

TfIdf.prototype.weights = function(documents,term) {
    
    var results = []
    
    var idf = this.idf(documents,term)
    
    for(var i=0;i<documents.length;i++) {
        
        var tf = this.tf(documents[i],term)
        var tfidf = tf*idf
        var result = {weight:tfidf,doc:documents[i]}    
        
        results.push(result)
    }

    return results
}

TfIdf.prototype.tf = function(words,term) {

    var result = 0
    
    for(var i=0;i<words.length;i++) {

        var word = words[i]

        if(word.indexOf(term)!=-1) {
            result = result+1
        }    
    }

    return result/words.length
}

TfIdf.prototype.idf = function(documents,term) {
   
    var occurence = 0

    for(var j=0;j<documents.length;j++) {
        
        var doc = documents[j]
        
        if(this.__wordInsideDoc(doc,term)){
            occurence = occurence+1
        }                  
    }

    if(occurence==0) {
        return undefined    
    }

    return Math.log(documents.length/occurence)
}

TfIdf.prototype.__wordInsideDoc = function(doc,term) {
    
    for(var i=0;i<doc.length;i++) {

        var word = doc[i]

        if(word.indexOf(term)!=-1) {
            return true
        }
    }    

    return false
}

module.exports = TfIdf

The function weights will accept the documents and term to search

An example follows

var TfIdf = require('./TfIdf')

var tfIdf = new TfIdf()

var docs = [["latest","sprint"],["lair","laugh","fault"],["lemma","on"]]

console.log(tfIdf.weights(docs,"la"))

The result is

[ { weight: 0.2027325540540822, doc: [ 'latest', 'sprint' ] },
  { weight: 0.27031007207210955,
    doc: [ 'lair', 'laugh', 'fault' ] },
  { weight: 0, doc: [ 'lemma', 'on' ] } ]

Now we shall proceed with the map reduce approach.

I will use node.js

First we will install the mongodb driver

npm install mongodb

Then we will setup our mongo database connection. Once initialized, in case there are no records, we will populate the database for testing purposes.

var MongoClient = require('mongodb').MongoClient
Server = require('mongodb').Server

var url = 'mongodb://localhost:27017/mapreduceexample'

function TfIdfMongo() {
}

TfIdfMongo.prototype.__getConnection = function(callback) {

    var tfIdfMongo = this

    MongoClient.connect(url,function (err, connection) {
        if (err) {
            callback(err)
        } else {

            var documents = connection.collection('documents');

            documents.count({}, function (error, numOfDocs) {
                if (numOfDocs == 0) {
                    tfIdfMongo.__insertTestRecords(connection,function(err) {
                        callback(err,connection)
                    })
                } else {
                    callback(undefined,connection)
                }
            })
        }
    })
}

TfIdfMongo.prototype.__insertTestRecords = function(connection,callback) {

    var documents = connection.collection('documents');

    var latestDocuments = [
        {words:["latest","sprint"]},
        {words:["lair","laugh","fault"]},
        {words:["lemma","on"]}
    ]

    documents.insert(latestDocuments,
        function(err,result) {
            callback(err)
        })
}

This is gonna be a two phase process.
On the first phase we have to calculate the idf.
To do so we will issue a map reduce.
The term variable has to be passed in order to be used by the map reduce process.
In order to use a dynamic variable on map reduce we will employee the scope parameter.

TfIdfMongo.prototype.__idf = function(connection,term,callback) {

    var tfIdfMongo = this

    var documents = connection.collection('documents');

    documents.mapReduce(
        tfIdfMongo.__mapIdf,
        tfIdfMongo.__reduceIdf,
        {
            scope: {permterm:term},
            out: "tfidf_results"
        },
        function(err,results) {

            if(err) {
                callback(err)
            }

            results.findOne({},function(err,result) {

                if(err) {
                    callback(err)
                }

                if(result.value.occurrence==0) {
                    return;
                }

                var idf = Math.log(result.value.count/result.value.occurrence)

                callback(undefined,idf)
            })
        }
    )
}

TfIdfMongo.prototype.__mapIdf = function() {

    var term = permterm

    var occurrence = 0

    for (var i = 0; i < this.words.length; i++) {

        var word = this.words[i]

        if (word.indexOf(term) != -1) {

            if (occurrence <=0 ) {

                occurrence = 1
            }
        }
    }

     emit("idf", occurrence)
}

TfIdfMongo.prototype.__reduceIdf = function(key,values) {

    var result = {count:values.length,occurrence:0}

    for(var i=0;i<values.length;i++) {

        if(values[i]==1) {
            result.occurrence += 1
        }
    }

    return result
}

The result is one number

On the second phase we have to calculate the tf for each document and multiply the result with the idf value calculated prior to this.
Map reduce will be used for this case too.
This time through the scope parameter, we are going to pass the term that we search for but also the idf variable.

TfIdfMongo.prototype.__tf = function(connection,term,idf,callback) {

    var tfIdfMongo = this

    var documents = connection.collection('documents');

    documents.mapReduce(
        tfIdfMongo.__mapTf,
        function(key,values) {

            return values
        },
        {
            scope: {permTerm:term,permIdf:idf},
            out: "tf_results"
        },
        function(err,results) {

            if(err) {
                callback(err)
            }

            results.find({},function(err,docs) {

                if(err) {
                    callback(err)
                }

                docs.toArray(function (err,documents) {
                    callback(err,documents)
                })
            })
        }
    )
}

TfIdfMongo.prototype.__mapTf = function() {

    var term = permTerm
    var idf = permIdf

    var occurrence = 0

    for(var i=0;i<this.words.length;i++) {

        var word = this.words[i]
        if (word.indexOf(term) != -1) {

            occurrence += 1
        }
    }

    var weight = idf*(occurrence/this.words.length)

    emit(this, weight)
}

We will implement the tfIdf function which combines the two previous steps.
The function takes the term that we need to search for as an argument.

var MongoClient = require('mongodb').MongoClient
Server = require('mongodb').Server

var url = 'mongodb://localhost:27017/mapreduceexample'

function TfIdfMongo() {
}

TfIdfMongo.prototype.tfIdf = function(term,callback) {

    var tfIdfMongo = this

    tfIdfMongo.__getConnection(function(err,connection) {

        if(err) {
            callback(err)
        }

        tfIdfMongo.__idf(connection,term,function(err,idf) {

            if(err) {
                callback(err)
            }

            tfIdfMongo.__tf(connection,term,idf,function(err,documents) {

                if(err) {
                    callback(err)
                }

                connection.close()

                callback(undefined,documents)

            })

        })
    })
}

TfIdfMongo.prototype.__getConnection = function(callback) {

    var tfIdfMongo = this

    MongoClient.connect(url,function (err, connection) {
        if (err) {
            callback(err)
        } else {

            var documents = connection.collection('documents');

            documents.count({}, function (error, numOfDocs) {
                if (numOfDocs == 0) {
                    tfIdfMongo.__insertTestRecords(connection,function(err) {
                        callback(err,connection)
                    })
                } else {
                    callback(undefined,connection)
                }
            })
        }
    })
}

TfIdfMongo.prototype.__insertTestRecords = function(connection,callback) {

    var documents = connection.collection('documents');

    var latestDocuments = [
        {words:["latest","sprint"]},
        {words:["lair","laugh","fault"]},
        {words:["lemma","on"]}
    ]

    documents.insert(latestDocuments,
        function(err,result) {
            callback(err)
        })

}

TfIdfMongo.prototype.__tf = function(connection,term,idf,callback) {

    var tfIdfMongo = this

    var documents = connection.collection('documents');

    documents.mapReduce(
        tfIdfMongo.__mapTf,
        function(key,values) {

            return values
        },
        {
            scope: {permTerm:term,permIdf:idf},
            out: "tf_results"
        },
        function(err,results) {

            if(err) {
                callback(err)
            }

            results.find({},function(err,docs) {

                if(err) {
                    callback(err)
                }

                docs.toArray(function (err,documents) {
                    callback(err,documents)
                })
            })
        }
    )
}

TfIdfMongo.prototype.__mapTf = function() {

    var term = permTerm
    var idf = permIdf

    var occurrence = 0

    for(var i=0;i<this.words.length;i++) {

        var word = this.words[i]
        if (word.indexOf(term) != -1) {

            occurrence += 1
        }
    }

    var weight = idf*(occurrence/this.words.length)

    emit(this, weight)
}


TfIdfMongo.prototype.__idf = function(connection,term,callback) {

    var tfIdfMongo = this

    var documents = connection.collection('documents');

    documents.mapReduce(
        tfIdfMongo.__mapIdf,
        tfIdfMongo.__reduceIdf,
        {
            scope: {permterm:term},
            out: "tfidf_results"
        },
        function(err,results) {

            if(err) {
                callback(err)
            }

            results.findOne({},function(err,result) {

                if(err) {
                    callback(err)
                }

                if(result.value.occurrence==0) {
                    return;
                }

                var idf = Math.log(result.value.count/result.value.occurrence)

                callback(undefined,idf)
            })
        }
    )
}

TfIdfMongo.prototype.__mapIdf = function() {

    var term = permterm

    var occurrence = 0

    for (var i = 0; i < this.words.length; i++) {

        var word = this.words[i]

        if (word.indexOf(term) != -1) {

            if (occurrence <=0 ) {

                occurrence = 1
            }
        }
    }

     emit(this.__id, occurrence)
}

TfIdfMongo.prototype.__reduceIdf = function(key,values) {

    var result = {count:values.length,occurrence:0}

    for(var i=0;i<values.length;i++) {

        if(values[i]==1) {
            result.occurrence += 1
        }
    }

    return result
}



module.exports = TfIdfMongo

Our test show case

var TfIdf = require('./TfIdf')
var TfIdfMongo = require('./TfIdfMongo')

var tfIdf = new TfIdf()

var docs = [["latest","sprint"],["lair","laugh","fault"],["lemma","on"]]


console.log("The results are "+JSON.stringify(tfIdf.tfIdf(docs,"la")))

var tfIdfMongo = new TfIdfMongo()

tfIdfMongo.tfIdf("la",function(err,results) {


    console.log("The results are "+JSON.stringify(results))

})

And we get the same results for both cases.

The results are [{"weight":0.2027325540540822,"doc":["latest","sprint"]},{"weight":0.27031007207210955,"doc":["lair","laugh","fault"]},{"weight":0,"doc":["lemma","on"]}]
The results are [{"_id":{"_id":"55f46602947446bb1a7f7933","words":["latest","sprint"]},"value":0.2027325540540822},{"_id":{"_id":"55f46602947446bb1a7f7934","words":["lair","laugh","fault"]},"value":0.27031007207210955},{"_id":{"_id":"55f46602947446bb1a7f7935","words":["lemma","on"]},"value":0}]

Why Should I use map reduce for this problem?

The tf-idf ranking problem, is a problem which includes computations, that can be parallelised.
The sequential approach could be an option for other environments but for Node.js there are many drawbacks.
Node.js is a single threaded environment, it was not designed for heavy computational tasks.
Its magic has to do with how good it executes I/O operations.
Consider the scenario of a large data set problem.
While the Node.js process would be executing the time consuming computations, the requests issued won’t be able to be executed appropriately.
However there are some workarounds for solutions based on Node.js, such as spawning extra nodes and implement a way of communication between them.

To sum up

Map reduce fits well to the ranking problem. Not only it takes away much of the computational overhead but also from the implementation overhead.