Read replicas and Spring Data Part 4: Configuring the read repository

Previously we set up two EntityManagers in the same application. One for the reads and one for the writes. Now it’s time to create our read repository.

The read only repository will use the secondary read only EntityManager.

In order to make it a read only repository, it is essential not to have any save and persist actions.

package com.gkatzioura.springdatareadreplica.repository;

import java.util.List;

import org.springframework.data.repository.Repository;

import com.gkatzioura.springdatareadreplica.config.ReadOnlyRepository;
import com.gkatzioura.springdatareadreplica.entity.Employee;

/**
 * This is a read only repository
 */
public interface ReadEmployeeRepository extends Repository {

    List findAll();

}

Our next task would be to create this repository with the read database entity manager.
This means that all repositories shall be created using the default entity manager except from the read only repositories.

I would create an Annotation first. This annotation will declare my repository as Read only. Also I will use this annotation for the scanning operation so that the appropriate EntityManager will be used.

package com.gkatzioura.springdatareadreplica.config;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
public @interface ReadOnlyRepository {
}

Now I know that spring boot removes the need for annotations and does repository creation in an automated way however our case is a peculiar one.

By making some adjustments our read only repository will look like this

package com.gkatzioura.springdatareadreplica.repository;

import java.util.List;

import org.springframework.data.repository.Repository;

import com.gkatzioura.springdatareadreplica.config.ReadOnlyRepository;
import com.gkatzioura.springdatareadreplica.entity.Employee;

/**
 * This is a read only repository
 */
@ReadOnlyRepository
public interface ReadEmployeeRepository extends Repository {

    List findAll();

}

And now it’s time to work with our repository scanning. All the repositories will be injected with the main EntityManager except from the ones annotated with the @ReadOnlyRepository annotation.

package com.gkatzioura.springdatareadreplica.config;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;

@Configuration
@EnableJpaRepositories(
        basePackages = "com.gkatzioura",
        excludeFilters = @ComponentScan.Filter(ReadOnlyRepository.class),
        entityManagerFactoryRef = "entityManagerFactory"
)
public class PrimaryEntityManagerConfiguration {

    @Value("${spring.datasource.username}")
    private String username;

    @Value("${spring.datasource.password}")
    private String password;

    @Value("${spring.datasource.url}")
    private String url;

    @Bean
    @Primary
    public DataSource dataSource() throws Exception {
        return DataSourceBuilder.create()
                                .url(url)
                                .username(username)
                                .password(password)
                                .driverClassName("org.postgresql.Driver")
                                .build();
    }

    @Bean
    @Primary
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(
            EntityManagerFactoryBuilder builder,
            @Qualifier("dataSource") DataSource dataSource) {
        return builder.dataSource(dataSource)
                      .packages("com.gkatzioura.springdatareadreplica")
                      .persistenceUnit("main")
                      .build();
    }

}

Also we will add the configuration for the read only repositories.

package com.gkatzioura.springdatareadreplica.config;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;

@Configuration
@EnableJpaRepositories(
        basePackages = "com.gkatzioura",
        includeFilters= @ComponentScan.Filter(ReadOnlyRepository.class),
        entityManagerFactoryRef = "readEntityManagerFactory"
)
public class ReadOnlyEntityManagerConfiguration {

    @Value("${spring.datasource.username}")
    private String username;

    @Value("${spring.datasource.password}")
    private String password;

    @Value("${spring.datasource.readUrl}")
    private String readUrl;

    @Bean
    public DataSource readDataSource() throws Exception {
        return DataSourceBuilder.create()
                                .url(readUrl)
                                .username(username)
                                .password(password)
                                .driverClassName("org.postgresql.Driver")
                                .build();
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean readEntityManagerFactory(
            EntityManagerFactoryBuilder builder,
            @Qualifier("readDataSource") DataSource dataSource) {
        return builder.dataSource(dataSource)
                      .packages("com.gkatzioura.springdatareadreplica")
                      .persistenceUnit("read")
                      .build();
    }

}

The secondary entity manager will be injected only to the repositories that only have the @ReadOnlyRepository annotation.

And to show this let’s make some changes to our controller.

package com.gkatzioura.springdatareadreplica.controller;

import java.util.List;

import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import com.gkatzioura.springdatareadreplica.entity.Employee;
import com.gkatzioura.springdatareadreplica.repository.EmployeeRepository;
import com.gkatzioura.springdatareadreplica.repository.ReadEmployeeRepository;

@RestController
public class EmployeeContoller {

    private final EmployeeRepository employeeRepository;
    private final ReadEmployeeRepository readEmployeeRepository;

    public EmployeeContoller(EmployeeRepository employeeRepository,
                             ReadEmployeeRepository readEmployeeRepository) {
        this.employeeRepository = employeeRepository;
        this.readEmployeeRepository = readEmployeeRepository;
    }

    @GetMapping("/employee")
    public List getEmployees() {
        return employeeRepository.findAll();
    }

    @GetMapping("/employee/read")
    public List getEmployeesRead() {
        return readEmployeeRepository.findAll();
    }

    @PostMapping("/employee")
    @ResponseStatus(HttpStatus.CREATED)
    public void addEmployee(@RequestBody Employee employee) {
        employeeRepository.save(employee);
    }

}

As you add employees to the system the read only repository will keep fetching the old employees while the main repository will fetch all of them including the recently persisted.

Read replicas and Spring Data Part 3: Configuring two entity managers

Our previous setup works as expected. What we shall do now is to get one step further and configure two separate entity managers without affecting the functionality we achieved previously.

The first step would be to set the default entity manager configuration to a primary one.
This is the first step

package com.gkatzioura.springdatareadreplica.config;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;

@Configuration
public class PrimaryEntityManagerConfiguration {

    @Value("${spring.datasource.username}")
    private String username;

    @Value("${spring.datasource.password}")
    private String password;

    @Value("${spring.datasource.url}")
    private String url;

    @Bean
    @Primary
    public DataSource dataSource() throws Exception {
        return DataSourceBuilder.create()
                                .url(url)
                                .username(username)
                                .password(password)
                                .driverClassName("org.postgresql.Driver")
                                .build();
    }

    @Bean
    @Primary
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(
            EntityManagerFactoryBuilder builder,
            @Qualifier("dataSource") DataSource dataSource) {
        return builder.dataSource(dataSource)
                      .packages("com.gkatzioura.springdatareadreplica")
                      .persistenceUnit("main")
                      .build();
    }

}

If you run your application with this configuration it will run just like our application previously.
Now it is time to configure the read only entity manager.

package com.gkatzioura.springdatareadreplica.config;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;

@Configuration
public class ReadOnlyEntityManagerConfiguration {

    @Value("${spring.datasource.username}")
    private String username;

    @Value("${spring.datasource.password}")
    private String password;

    @Value("${spring.datasource.readUrl}")
    private String readUrl;

    @Bean
    public DataSource readDataSource() throws Exception {
        return DataSourceBuilder.create()
                                .url(readUrl)
                                .username(username)
                                .password(password)
                                .driverClassName("org.postgresql.Driver")
                                .build();
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean readEntityManagerFactory(
            EntityManagerFactoryBuilder builder,
            @Qualifier("readDataSource") DataSource dataSource) {
        return builder.dataSource(dataSource)
                      .packages("com.gkatzioura.springdatareadreplica")
                      .persistenceUnit("read")
                      .build();
    }

}

Also I will add a method to a controller in order to save the models.

package com.gkatzioura.springdatareadreplica.controller;

import java.util.List;

import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import com.gkatzioura.springdatareadreplica.entity.Employee;
import com.gkatzioura.springdatareadreplica.repository.EmployeeRepository;

@RestController
public class EmployeeContoller {

    private final EmployeeRepository employeeRepository;

    public EmployeeContoller(EmployeeRepository employeeRepository) {
        this.employeeRepository = employeeRepository;
    }

    @GetMapping("/employee")
    public List<Employee> getEmployees() {
        return employeeRepository.findAll();
    }

    @PostMapping("/employee")
    @ResponseStatus(HttpStatus.CREATED)
    public void addEmployee(@RequestBody Employee employee) {
        employeeRepository.save(employee);
    }

}

If you do try to add the an employee using the controller and then query the read database you shall see that no entry is being added at all.

So we have our primary entity manager up and running and we also have a secondary one. The secondary one is not used yet. The next blog focuses on putting the secondary read only entity manager in use.

Read replicas and Spring Data Part 2: Configuring the base project

In our previous post we set up multiple PostgreSQL instances with the same data.
Our next step would be to configure our spring project by using the both servers.

As stated previously we shall use some of the code taken from the Spring Boot JPA post, since we use exactly the same database.

This shall be our gradle build file

plugins {
	id 'org.springframework.boot' version '2.1.9.RELEASE'
	id 'io.spring.dependency-management' version '1.0.8.RELEASE'
	id 'java'
}

group = 'com.gkatzioura'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation "org.postgresql:postgresql:42.2.8"
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

Now let’s proceed on creating the model based on the table created on the previous blog.

package com.gkatzioura.springdatareadreplica.entity;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "employee", catalog="spring_data_jpa_example")
public class Employee {

    @Id
    @Column(name = "id")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "firstname")
    private String firstName;

    @Column(name = "lastname")
    private String lastname;

    @Column(name = "email")
    private String email;

    @Column(name = "age")
    private Integer age;

    @Column(name = "salary")
    private Integer salary;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastname() {
        return lastname;
    }

    public void setLastname(String lastname) {
        this.lastname = lastname;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public Integer getSalary() {
        return salary;
    }

    public void setSalary(Integer salary) {
        this.salary = salary;
    }

}

And the next step is to create a spring data repository.

package com.gkatzioura.springdatareadreplica.repository;

import org.springframework.data.jpa.repository.JpaRepository;
import com.gkatzioura.springdatareadreplica.entity.Employee;

public interface EmployeeRepository extends JpaRepository<Employee,Long> {
}

Also we are going to add a controller.

package com.gkatzioura.springdatareadreplica.controller;

import java.util.List;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.gkatzioura.springdatareadreplica.entity.Employee;
import com.gkatzioura.springdatareadreplica.repository.EmployeeRepository;

@RestController
public class EmployeeContoller {

    private final EmployeeRepository employeeRepository;

    public EmployeeContoller(EmployeeRepository employeeRepository) {
        this.employeeRepository = employeeRepository;
    }

    @RequestMapping("/employee")
    public List<Employee> getEmployees() {
        return employeeRepository.findAll();
    }

}

All that it takes is to just add the right properties in you application.yaml

spring:
  datasource:
    platform: postgres
    driverClassName: org.postgresql.Driver
    username: db-user
    password: your-password
    url: jdbc:postgresql://127.0.0.2:5432/postgres

Spring boot has made it possible nowadays not to bother with any JPA configurations.

This is all you need in order to run the application. Once your application is running just try to fetch the employees.

curl http://localhost:8080/employee

As you have seen we did not do any JPA configuration. Since Spring Boot 2 specifying the database url is sufficient for the auto configuration to kick in and do all this configuration for you.

However in our case we want to have multiple datasource and entity manager configurations. In the next post we shall configure the entity managers for our application.

Read replicas and Spring Data Part 1: Configuring the Databases

This is a series of blog posts on our quest to increase our application’s performance by utilizing read replicas.

For this project our goal is to set up our spring data application and use read repositories for writes and
repositories based on read replicas for reads.

In order to simulate this environment we shall use PostgreSQL instances through Docker.

The motives are simple. Your Spring application has become increasingly popular and you want it to handle more requests. Most of the applications out there have a higher demand for read operations rather than write operations. Thus I assume that your application falls into the same category.
Although SQL databases are not horizontally scalable on their own, you can work you way with them by using read replicas.

Our goal is not to make an actual Read replication in PostgreSQL

thereforeinstead of configuring any replication

we will just copy some data from both databases

This is the script we shall use to populate the databases.

#!/bin/bash
set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" &amp;lt;&amp;lt;-EOSQL
    create schema spring_data_jpa_example;

    create table spring_data_jpa_example.employee(
        id  SERIAL PRIMARY KEY,
        firstname   TEXT    NOT NULL,
        lastname    TEXT    NOT NULL,
        email       TEXT    not null,
        age         INT     NOT NULL,
        salary         real,
        unique(email)
    );

    insert into spring_data_jpa_example.employee (firstname,lastname,email,age,salary)
    values ('John','Doe 1','john1@doe.com',18,1234.23);
    insert into spring_data_jpa_example.employee (firstname,lastname,email,age,salary)
    values ('John','Doe 2','john2@doe.com',19,2234.23);
    insert into spring_data_jpa_example.employee (firstname,lastname,email,age,salary)
    values ('John','Doe 3','john3@doe.com',20,3234.23);
    insert into spring_data_jpa_example.employee (firstname,lastname,email,age,salary)
    values ('John','Doe 4','john4@doe.com',21,4234.23);
    insert into spring_data_jpa_example.employee (firstname,lastname,email,age,salary)
    values ('John','Doe 5','john5@doe.com',22,5234.23);
EOSQL

Since we shall use and Docker and Docker Compose the script above shall be used in order to initialize the database.
Now on to create our Docker Compose stack.

version: '3.5'

services:
  write-db:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: db-user
      POSTGRES_PASSWORD: your-password
      POSTGRES_DB: postgres
    networks:
      - postgresql-network
    ports:
      - "127.0.0.2:5432:5432"
    volumes:
      - $PWD/init-db-script.sh:/docker-entrypoint-initdb.d/init-db-script.sh
  read-db-1:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: db-user
      POSTGRES_PASSWORD: your-password
      POSTGRES_DB: postgres
    networks:
      - postgresql-network
    ports:
      - "127.0.0.3:5432:5432"
    volumes:
      - $PWD/init-db-script.sh:/docker-entrypoint-initdb.d/init-db-script.sh
networks:
  postgresql-network:
    name: postgresql-network

As you see our configuration is pretty simple. If you are careful enough you would see that I gave the number one to the read-db. This is because in the future we will add more replicas to it.

What I also did is bounding the machines to different local ips.

If you have problem binding addresses like 127.0.0.*:5432
You should try

sudo ifconfig lo0 alias 127.0.0.2 up
sudo ifconfig lo0 alias 127.0.0.3 up

If you are unsuccessful then just change the ports and it will work. It might not be as convenient but it’s still ok.

So let’s get up and running our Docker Compose stack.

docker-compose -f ./postgresql-stack.yaml up

We must be able to query data in both postgresql instances.

docker exec -it deploy_read-db-1_1 /bin/bash
root@07c502968cb3:/# psql -v --username "$POSTGRES_USER" --dbname "$POSTGRES_DB"
db-user=# select*from spring_data_jpa_example.employee;
 id | firstname | lastname |     email     | age | salary
----+-----------+----------+---------------+-----+---------
  1 | John      | Doe 1    | john1@doe.com |  18 | 1234.23
  2 | John      | Doe 2    | john2@doe.com |  19 | 2234.23
  3 | John      | Doe 3    | john3@doe.com |  20 | 3234.23
  4 | John      | Doe 4    | john4@doe.com |  21 | 4234.23
  5 | John      | Doe 5    | john5@doe.com |  22 | 5234.23
(5 rows)

We pretty much set up for our next step. We have some databases up and running and we are going to spin up a spring application running upon them. The next blog focuses on implementing an application running upon our primary database.

Pub/Sub local emulator

Pub/Sub is a nice tool provided by GCP.  It is really handy and can help you with the messaging challenges you application might face. Actually if you work with GCP it is the managed messaging solution that you can use.

As expected working with the actual Pub/Sub solution comes with some quota, so for

development it is essential to use something which is not going to cost you.

In these cases you can use the Pub/Sub emulator. To get started with the emulator you need to install it

gcloud components install pubsub-emulator

It is indeed convenient however to have a docker image since it is way more portable. Unfortunately there is no official image for that from google cloud. In any case you can use one of the solutions available on Docker Hub.

Now let’s run it

gcloud beta emulators pubsub start --project=test-project

After that your application can connect to the pub/sub emulator. The default port is 8085

I will use a Java unit test as an example for this one.

package org.gkatzioura.pubsub;

import java.io.IOException;
import java.nio.charset.Charset;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class LocalPubSubTest {

    private static final String PROJECT = "test-project";
    private static final String SUBSCRIPTION_NAME = "SUBSCRIBER";
    private static final String TOPIC_NAME = "test-topic-id";

    private static final String hostPort = "127.0.0.1:8085";

    private ManagedChannel channel;
    private TransportChannelProvider channelProvider;
    private TopicAdminClient topicAdmin;

    private Publisher publisher;
    private SubscriberStub subscriberStub;
    private SubscriptionAdminClient subscriptionAdminClient;

    private ProjectTopicName topicName = ProjectTopicName.of(PROJECT, TOPIC_NAME);
    private ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT, SUBSCRIPTION_NAME);

    private Subscription subscription;

    @Before
    public void setUp() throws Exception {
        channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build();
        channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

        CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

        topicAdmin = createTopicAdmin(credentialsProvider);
        topicAdmin.createTopic(topicName);

        publisher = createPublisher(credentialsProvider);
        subscriberStub = createSubscriberStub(credentialsProvider);
        subscriptionAdminClient = createSubscriptionAdmin(credentialsProvider);
        subscription = subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
    }

    @After
    public void tearDown() throws Exception {
        topicAdmin.deleteTopic(topicName);
        subscriptionAdminClient.deleteSubscription(subscription.getName());
        channel.shutdownNow();
    }

    @Test
    public void testLocalPubSub() throws Exception {
        final String messageText = "text";
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                                                   .setData(ByteString.copyFrom(messageText, Charset.defaultCharset()))
                                                   .build();
        publisher.publish(pubsubMessage).get();

        PullRequest pullRequest = PullRequest.newBuilder()
                                             .setMaxMessages(1)
                                             .setReturnImmediately(true) // return immediately if messages are not available
                                             .setSubscription(subscription.getName())
                                             .build();

        PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest);
        String receiveMessageText = pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8();

        Assert.assertEquals(messageText, receiveMessageText);
    }

    private TopicAdminClient createTopicAdmin(CredentialsProvider credentialsProvider) throws IOException {
        return TopicAdminClient.create(
                TopicAdminSettings.newBuilder()
                                  .setTransportChannelProvider(channelProvider)
                                  .setCredentialsProvider(credentialsProvider)
                                  .build()
        );
    }

    private SubscriptionAdminClient createSubscriptionAdmin(CredentialsProvider credentialsProvider) throws IOException {
        SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
                                                                                       .setCredentialsProvider(credentialsProvider)
                                                                                       .setTransportChannelProvider(channelProvider)
                                                                                       .build();
        return SubscriptionAdminClient.create(subscriptionAdminSettings);
    }

    private Publisher createPublisher(CredentialsProvider credentialsProvider) throws IOException {
        return Publisher.newBuilder(topicName)
                        .setChannelProvider(channelProvider)
                        .setCredentialsProvider(credentialsProvider)
                        .build();
    }

    private SubscriberStub createSubscriberStub(CredentialsProvider credentialsProvider) throws IOException {
        SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
                                                                              .setTransportChannelProvider(channelProvider)
                                                                              .setCredentialsProvider(credentialsProvider)
                                                                              .build();
        return GrpcSubscriberStub.create(subscriberStubSettings);
    }

}

That’s it. Now you can have some cost efficient unit tests!

Spring Boot & Hibernate: Print queries and variables

It’s late in the office and you are stuck with this strange Jpa code with JoinColumns and cascades and you cannot find what goes wrong. You wish there is a way to view the icon-spring-frameworkqueries printed and also the values.
With a little tweaking to your Spring Boot application this is possible.

 

With the help of lombock heres is our jpa model.

package com.gkatzioura.hibernatelog.dao;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

import lombok.Data;

@Data
@Entity
@Table(name = "application_user")
public class ApplicationUser {

    @Id
    private Long id;

    private String username;

    private String password;

}

It’s repository

package com.gkatzioura.hibernatelog.dao;

import org.springframework.data.repository.CrudRepository;

public interface ApplicationUserRepository extends CrudRepository {
}

A not found exception

package com.gkatzioura.hibernatelog.controller;

import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;

@ResponseStatus(value = HttpStatus.NOT_FOUND)
class ApplicationUserNotFoundException extends RuntimeException {

    public ApplicationUserNotFoundException() {
    }

    public ApplicationUserNotFoundException(String message) {
        super(message);
    }

    public ApplicationUserNotFoundException(String message, Throwable cause) {
        super(message, cause);
    }

    public ApplicationUserNotFoundException(Throwable cause) {
        super(cause);
    }

    public ApplicationUserNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
    }
}

And a controller

package com.gkatzioura.hibernatelog.controller;

import java.util.Optional;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import com.gkatzioura.hibernatelog.dao.ApplicationUser;
import com.gkatzioura.hibernatelog.dao.ApplicationUserRepository;

@RestController
public class ApplicationUserController {

    private final ApplicationUserRepository applicationUserRepository;

    public ApplicationUserController(ApplicationUserRepository applicationUserRepository) {
        this.applicationUserRepository = applicationUserRepository;
    }

    @GetMapping("/user/{id}")
    @ResponseBody
    public ApplicationUser getApplicationUser(@PathVariable Long id) {
        Optional applicationUser = applicationUserRepository.findById(id);
        if(applicationUser.isPresent()) {
            return applicationUser.get();
        } else {
            throw new ApplicationUserNotFoundException();
        }
    }

}

By adding the following to application.yaml we ensure the creation of the table through hibernate, the logging of the queries, the formatting of the sql queries logged and also the actual parameters values displayed.

spring:
  jpa:
    hibernate:
      ddl-auto: create
    properties:
      hibernate:
        show_sql: true
        use_sql_comments: true
        format_sql: true
logging:
  level:
    org:
      hibernate:
        type: trace

Just

curl http://localhost:8080/user/1

And you got your logs.

Run you first gatling load test using scala.

Gatling is a neat tool. You can create your load tests by just coding in scala. Jmeter allows you to do so through a plugin or beanshell but it is not as direct as the way gatling does so.

I will start by adding the gatling plugin

addSbtPlugin("io.gatling" % "gatling-sbt" % "3.0.0")

The next step is to changed the build.sbt


version := "0.1"
scalaVersion := "2.12.8"

enablePlugins(GatlingPlugin)

scalacOptions := Seq(
  "-encoding", "UTF-8", "-target:jvm-1.8", "-deprecation",
  "-feature", "-unchecked", "-language:implicitConversions", "-language:postfixOps")
libraryDependencies += "io.gatling.highcharts" % "gatling-charts-highcharts" % "3.1.2" % "test,it"
libraryDependencies += "io.gatling"            % "gatling-test-framework"    % "3.1.2" % "test,it"

The above are no different than what you can find on the official site when it comes to sbt commands and gatling.

Our next step is to add a simple http test. Be aware that you should add it in the directories src/test or src/it since, as it is instructed from the sbt dependencies for the binaries to take effect on these directories.

I shall put this test on src/test/scala/com/gkatzioura/BasicSimulation.scala

package com.gkatzioura

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

class BasicSimulation extends Simulation {

  val httpConf = http.baseUrl("http://yourapi.com")
      .doNotTrackHeader("1")

  val scn = scenario("BasicSimulation")
    .exec(http("request_1")
    .get("/"))
    .pause(5)

  setUp(scn.inject(atOnceUsers(1))).protocols(httpConf)
}

Afterwards testing is simple. You go to sbt mode and execute the test.

sbt
>gatling:testOnly com.gkatzioura.BasicSimulation
>gatling:test

The first command instructs to run just one test, the second one shall run everything.

That’s it! Pretty simple.

A guide to the InfluxDBMapper and QueryBuilder for Java: Into and Order

Previously we used the group by statement extensively in order to execute complex aggregation queries

On this tutorial we are going to have a look at ‘into’ statements and the ‘order by’ close.

Apart from inserting or selecting data we might as well want to persist the results from one query into another table. The usages on something like this can vary. For example you might have a complex operation that cannot be executed in one single query.

Before we continue, make sure you have an influxdb instance up and running.

The most common action with an into query would be to populate a measurement with the results of a previous query.

Let’s copy a database.

Query query = select()
                .into("\"copy_NOAA_water_database\".\"autogen\".:MEASUREMENT")
                .from(DATABASE, "\"NOAA_water_database\".\"autogen\"./.*/")
                .groupBy(new RawText("*"));

The result of this query will be to copy the results into the h2o_feet_copy_1 measurement.

SELECT * INTO "copy_NOAA_water_database"."autogen".:MEASUREMENT FROM "NOAA_water_database"."autogen"./.*/ GROUP BY *;

Now let’s just copy a column into another table.

        Query query = select().column("water_level")
                               .into("h2o_feet_copy_1")
                               .from(DATABASE,"h2o_feet")
                               .where(eq("location","coyote_creek"));

Bellow is the query which is going to be execcuted.

SELECT water_level INTO h2o_feet_copy_1 FROM h2o_feet WHERE location = 'coyote_creek';

Also we can do exactly the same thing with aggregations.

Query query = select()
                .mean("water_level")
                .into("all_my_averages")
                .from(DATABASE,"h2o_feet")
                .where(eq("location","coyote_creek"))
                .and(gte("time","2015-08-18T00:00:00Z"))
                .and(lte("time","2015-08-18T00:30:00Z"))
                .groupBy(time(12l,MINUTE));
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);

And generate a query which persists the aggregation result into a table.

SELECT MEAN(water_level) INTO all_my_averages FROM h2o_feet WHERE location = 'coyote_creek' AND time >= '2015-08-18T00:00:00Z' AND time <= '2015-08-18T00:30:00Z' GROUP BY time(12m);

Order clauses

Influxdb does provide ordering however it is limited only to dates.
So we will execute a query with ascending order.

Query query = select().from(DATABASE, "h2o_feet")
                               .where(eq("location","santa_monica"))
                               .orderBy(asc());

And we get the ascending ordering as expected.

SELECT * FROM h2o_feet WHERE location = 'santa_monica' ORDER BY time ASC;

And the same query we shall executed with descending order.

Query query = select().from(DATABASE, "h2o_feet")
                               .where(eq("location","santa_monica"))
                               .orderBy(desc());
 SELECT * FROM h2o_feet WHERE location = 'santa_monica' ORDER BY time DESC;

That’s it! We just created some new databases and measurements by just using existing data in our database. We also executed some statements where we specified the time ordering.
You can find the sourcecode in github.

A guide to the InfluxDBMapper and QueryBuilder for Java: Group By

Previously we executed some selection examples and aggregations against an InfluxDB database. In this tutorial we are going to check the group by functionality that the Query Builder provides to us with.

Before you start you need to spin up an influxdb instance with the data needed.

Supposing that we want to group by a single tag, we shall use the groupBy function.

        Query query = select().mean("water_level").from(DATABASE, "h2o_feet").groupBy("location");
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);

The query to be executed shall be

SELECT MEAN(water_level) FROM h2o_feet GROUP BY location;

If we want to group by multiple tags we will pass an array of tags.

        Query query = select().mean("index").from(DATABASE,"h2o_feet")
                              .groupBy("location","randtag");
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);

The result will be

SELECT MEAN(index) FROM h2o_feet GROUP BY location,randtag;

Another option is to query by all tags.

        Query query = select().mean("index").from(DATABASE,"h2o_feet")
                              .groupBy(raw("*"));
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);
SELECT MEAN(index) FROM h2o_feet GROUP BY *;

Since InfluxDB is a time series database we have great group by functionality based on time.

For example let’s group query results into 12 minute intervals

        Query query = select().count("water_level").from(DATABASE,"h2o_feet")
                              .where(eq("location","coyote_creek"))
                              .and(gte("time","2015-08-18T00:00:00Z"))
                              .and(lte("time","2015-08-18T00:30:00Z"))
                              .groupBy(time(12l,MINUTE));
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);

We get the result

SELECT COUNT(water_level) FROM h2o_feet WHERE location = 'coyote_creek' AND time &gt;= '2015-08-18T00:00:00Z' AND time &lt;= '2015-08-18T00:30:00Z' GROUP BY time(12m);

Group results by 12 minute intervals and location.

        Query query = select().count("water_level").from(DATABASE,"h2o_feet")
                              .where()
                              .and(gte("time","2015-08-18T00:00:00Z"))
                              .and(lte("time","2015-08-18T00:30:00Z"))
                              .groupBy(time(12l,MINUTE),"location");
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);

We get the following query.

SELECT COUNT(water_level) FROM h2o_feet WHERE time &gt;= '2015-08-18T00:00:00Z' AND time &lt;= '2015-08-18T00:30:00Z' GROUP BY time(12m),location;

We will get more advanced and group query results into 18 minute intervals and shift the preset time boundaries forward.

        Query query = select().mean("water_level").from(DATABASE,"h2o_feet")
                              .where(eq("location","coyote_creek"))
                              .and(gte("time","2015-08-18T00:06:00Z"))
                              .and(lte("time","2015-08-18T00:54:00Z"))
                              .groupBy(time(18l,MINUTE,6l,MINUTE));
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);
SELECT MEAN(water_level) FROM h2o_feet WHERE location = 'coyote_creek' AND time &gt;= '2015-08-18T00:06:00Z' AND time &lt;= '2015-08-18T00:54:00Z' GROUP BY time(18m,6m);

Or group query results into 12 minute intervals and shift the preset time boundaries back;

        Query query = select().mean("water_level").from(DATABASE,"h2o_feet")
                              .where(eq("location","coyote_creek"))
                              .and(gte("time","2015-08-18T00:06:00Z"))
                              .and(lte("time","2015-08-18T00:54:00Z"))
                              .groupBy(time(18l,MINUTE,-12l,MINUTE));
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);

The result would be

SELECT MEAN(water_level) FROM h2o_feet WHERE location = 'coyote_creek' AND time &gt;= '2015-08-18T00:06:00Z' AND time &lt;= '2015-08-18T00:54:00Z' GROUP BY time(18m,-12m);

Eventually we can group by and fill

        Query query = select()
                .column("water_level")
                .from(DATABASE, "h2o_feet")
                .where(gt("time", op(ti(24043524l, MINUTE), SUB, ti(6l, MINUTE))))
                .groupBy("water_level")
                .fill(100);
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);

The result would be

SELECT water_level FROM h2o_feet WHERE time &gt; 24043524m - 6m GROUP BY water_level fill(100);

That’s it! We just run some really complex group by queries against our InfluxDB database. The query builder makes it possible to create queries using only java.
You can find the sourcecode in github.

A guide to the InfluxDBMapper and QueryBuilder for Java Part: 2

Previously we setup an influxdb instance running through docker and we also run our first InfluxDBMapper code against an influxdb database.

The next step is to execute some queries against influxdb using the QueryBuilder combined with the InfluxDBMapper.

Let’s get started and select everything from the table H2OFeetMeasurement.

private static final String DATABASE = "NOAA_water_database";

public static void main(String[] args) {
    InfluxDB influxDB = InfluxDBFactory.connect("http://localhost:8086", "root", "root");

    InfluxDBMapper influxDBMapper = new InfluxDBMapper(influxDB);

    Query query = select().from(DATABASE,"h2o_feet");
    List h2OFeetMeasurements = influxDBMapper.query(query, H2OFeetMeasurement.class);
}

Let’s get more specific, we will select measurements with water level higher than 8.

        Query query = select().from(DATABASE,"h2o_feet").where(gt("water_level",8));
        LOGGER.info("Executing query "+query.getCommand());
        List higherThanMeasurements = influxDBMapper.query(query, H2OFeetMeasurement.class);

I bet you noticed the query.getCommand() detail. If you want to see the actual query that is being executed you can call the getCommand() method from the query.

Apart from where statements we can perform certain operations on fields such as calculations.

        Query query = select().op(op(cop("water_level",MUL,2),"+",4)).from(DATABASE,"h2o_feet");
        LOGGER.info("Executing query "+query.getCommand());
        QueryResult queryResult = influxDB.query(query);

We just used the cop function to multiply the water level by 2. The cop function creates a clause which will execute an operation to a column. Then we are going to increment by 4 the product of the previous operation by using the op function. The op function creates a clause which will execute an operation with regards to two arguments given.

Next case is to select using a specific string field key-value

        Query query = select().from(DATABASE,"h2o_feet").where(eq("location","santa_monica"));
        LOGGER.info("Executing query "+query.getCommand());
        List h2OFeetMeasurements = influxDBMapper.query(query, H2OFeetMeasurement.class);

Things can get even more specific and select data that have specific field key-values and tag key-values.

        Query query = select().column("water_level").from(DATABASE,"h2o_feet")
                              .where(neq("location","santa_monica"))
                              .andNested()
                              .and(lt("water_level",-0.59))
                              .or(gt("water_level",9.95))
                              .close();
        LOGGER.info("Executing query "+query.getCommand());
        List h2OFeetMeasurements = influxDBMapper.query(query, H2OFeetMeasurement.class);

Since influxdb is a time series database it is essential to issue queries with specific timestamps.

        Query query = select().from(DATABASE,"h2o_feet")
                              .where(gt("time",subTime(7,DAY)));
        LOGGER.info("Executing query "+query.getCommand());
        List h2OFeetMeasurements = influxDBMapper.query(query, H2OFeetMeasurement.class);

Last but not least we can make a query for specific fields. I will create a model just for the fields that we are going to retrieve.

package com.gkatzioura.mapper.showcase;

import java.time.Instant;
import java.util.concurrent.TimeUnit;

import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

@Measurement(name = "h2o_feet", timeUnit = TimeUnit.SECONDS)
public class LocationWithDescription {

    @Column(name = "time")
    private Instant time;

    @Column(name = "level description")
    private String levelDescription;

    @Column(name = "location")
    private String location;

    public Instant getTime() {
        return time;
    }

    public void setTime(Instant time) {
        this.time = time;
    }

    public String getLevelDescription() {
        return levelDescription;
    }

    public void setLevelDescription(String levelDescription) {
        this.levelDescription = levelDescription;
    }

    public String getLocation() {
        return location;
    }

    public void setLocation(String location) {
        this.location = location;
    }
}

And now I shall query for them.

Query selectFields = select("level description","location").from(DATABASE,"h2o_feet");
List locationWithDescriptions = influxDBMapper.query(selectFields, LocationWithDescription.class);

As you can see we can also map certain fields to a model. For now mapping to models can be done only when data come from a certain measurements. Thus we shall proceed on more query builder specific examples next time.

You can find the source code on github.