Use Redis GeoHash with Spring boot

One very handy Data Structure when it comes to Redis is the GeoHash Data structure. Essentially it is a sorted set that generates a score based on the longitude and latitude.

We will spin up a Redis database using Compose

services:
  redis:
    image: redis
    ports:
      - 6379:6379

Can be run like this

docker compose up

You can find more on Compose on the Developers Essential Guide to Docker Compose.

Let’s add our dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <groupId>org.example</groupId>
    <version>1.0-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>location-service</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.7.5</version>
        </dependency>
    </dependencies>
</project>

We shall start with our Configuration. For convenience on injecting we shall create a GeoOperations<String,String> bean.

package org.location;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.GeoOperations;
import org.springframework.data.redis.core.RedisTemplate;

@Configuration
public class RedisConfiguration {

    @Bean
    public GeoOperations<String,String> geoOperations(RedisTemplate<String,String> template) {
        return template.opsForGeo();
    }

}

Our model would be this one

package org.location;

import lombok.Data;

@Data
public class Location {

    private String name;
    private Double lat;
    private Double lng;

}

This simple service will persist venue locations and also fetch venues nearby of a location.

package org.location;

import java.util.List;
import java.util.stream.Collectors;

import org.springframework.data.geo.Circle;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metrics;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.core.GeoOperations;
import org.springframework.data.redis.domain.geo.GeoLocation;
import org.springframework.stereotype.Service;

@Service
public class GeoService {

    public static final String VENUS_VISITED = "venues_visited";
    private final GeoOperations<String, String> geoOperations;

    public GeoService(GeoOperations<String, String> geoOperations) {
        this.geoOperations = geoOperations;
    }

    public void add(Location location) {
        Point point = new Point(location.getLng(), location.getLat());
        geoOperations.add(VENUS_VISITED, point, location.getName());

    }

    public List<String> nearByVenues(Double lng, Double lat, Double kmDistance) {
        Circle circle = new Circle(new Point(lng, lat), new Distance(kmDistance, Metrics.KILOMETERS));
        GeoResults<RedisGeoCommands.GeoLocation<String>> res = geoOperations.radius(VENUS_VISITED, circle);
        return res.getContent().stream()
                  .map(GeoResult::getContent)
                  .map(GeoLocation::getName)
                  .collect(Collectors.toList());
    }

}

We shall also add a controller

package org.location;

import java.util.List;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class LocationController {

    private final GeoService geoService;

    public LocationController(GeoService geoService) {
        this.geoService = geoService;
    }

    @PostMapping("/location")
    public ResponseEntity<String> addLocation(@RequestBody Location location) {
        geoService.add(location);
        return ResponseEntity.ok("Success");
    }

    @GetMapping("/location/nearby")
    public ResponseEntity<List<String>> locations(Double lng, Double lat, Double km) {
        List<String> locations = geoService.nearByVenues(lng, lat, km);
        return ResponseEntity.ok(locations);
    }

}

Then let’s add an element.

curl --location --request POST 'localhost:8080/location' \
--header 'Content-Type: application/json' \
--data-raw '{
	"lng": 51.5187516,
	"lat":-0.0814374,
	"name": "liverpool-street"
}'

Let’s retrieve the element of the api

curl --location --request GET 'localhost:8080/location/nearby?lng=51.4595573&lat=0.24949&km=100'
> [
    "liverpool-street"
]

And also let’s check redis

ZRANGE venues_visited 0 -1 WITHSCORES
1) "liverpool-street"
2) "2770072452773375"

We did it, pretty convenient for our day to day distance use cases.

Advertisement

Use JMH for your Java applications with Gradle

If you want to benchmark you code, the Java Microbenchmark Harness is the tool of choice.
In our example we shall use the refill-rate-limiter project

 

Since refill-rate-limiter uses Gradle we will use the following plugin for gradle

plugins {
...
  id "me.champeau.gradle.jmh" version "0.5.3"
...
}

We shall place the Benchmark at the jmh/java/io/github/resilience4j/ratelimiter folder.

Our Benchmark should look like this.

package io.github.resilience4j.ratelimiter;

import io.github.resilience4j.ratelimiter.internal.RefillRateLimiter;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.profile.GCProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.All)
public class RateLimiterBenchmark {

    private static final int FORK_COUNT = 2;
    private static final int WARMUP_COUNT = 10;
    private static final int ITERATION_COUNT = 10;
    private static final int THREAD_COUNT = 2;

    private RefillRateLimiter refillRateLimiter;

    private Supplier<String> refillGuardedSupplier;

    public static void main(String[] args) throws RunnerException {
        Options options = new OptionsBuilder()
                .addProfiler(GCProfiler.class)
                .build();
        new Runner(options).run();
    }

    @Setup
    public void setUp() {

        RefillRateLimiterConfig refillRateLimiterConfig = RefillRateLimiterConfig.custom()
                                                                                 .limitForPeriod(1)
                                                                                 .limitRefreshPeriod(Duration.ofNanos(1))
                                                                                 .timeoutDuration(Duration.ofSeconds(5))
                                                                                 .build();

        refillRateLimiter = new RefillRateLimiter("refillBased", refillRateLimiterConfig);

        Supplier<String> stringSupplier = () -> {
            Blackhole.consumeCPU(1);
            return "Hello Benchmark";
        };

        refillGuardedSupplier = RateLimiter.decorateSupplier(refillRateLimiter, stringSupplier);
    }

    @Benchmark
    @Threads(value = THREAD_COUNT)
    @Warmup(iterations = WARMUP_COUNT)
    @Fork(value = FORK_COUNT)
    @Measurement(iterations = ITERATION_COUNT)
    public String refillPermission() {
        return refillGuardedSupplier.get();
    }

}

Let’s now check the elements one by one.

By using Benchmark scope all the threads used on the benchmark scope will share the same object. We do so because we want to test how refill-rate-limiter performs in a multithreaded scenario.

@State(Scope.Benchmark)

We would like our results to be reported in microseconds, therefore we shall use the OutputTimeUnit.

@OutputTimeUnit(TimeUnit.MICROSECONDS)

On JMH We have various benchmark modes depending on what we want to measure.

Throughput is when we want to measure the number operations per unit of time.
AverageTime when we want to measure the average time per operation.
SampleTime when we want to sample the time for each operation including min, max time, more than just the average.
SingleShotTime: when we want to measure the time for a single operation. This can help when we want to identify how the operation will do on a cold start.

We also have the option to measure all the above.

@BenchmarkMode(Mode.All)

Those options configured on the class level will apply to the benchmark methods we shall add.

Let’s also examine how the benchmark will run

We will specify the number of Threads by using the Threads annotation.

@Threads(value = THREAD_COUNT)

Also we want to warm up before we run the actual benchmarks. This way our code will be initialized, online optimizations will take place, and our runtime will adapt to the conditions before we run the benchmarks.

@Warmup(iterations = WARMUP_COUNT)

Using a Fork we shall instruct how many times the benchmark will run.

@Fork(value = FORK_COUNT)

Then we need to specify the number of iterations we want to measure/

@Measurement(iterations = ITERATION_COUNT)

We can start our test by just using

gradle jmh

The results will be save in a file.

...
2022-10-28T09:08:44.522+0100 [QUIET] [system.out] Benchmark result is saved to /path/refill-rate-limiter/build/reports/jmh/results.txt
..

Let’s examine the results.

Benchmark                                                         Mode       Cnt      Score   Error   Units
RateLimiterBenchmark.refillPermission                            thrpt        20     13.594 ± 0.217  ops/us
RateLimiterBenchmark.refillPermission                             avgt        20      0.147 ± 0.002   us/op
RateLimiterBenchmark.refillPermission                           sample  10754462      0.711 ± 0.025   us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.00    sample                  ≈ 0           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.50    sample                0.084           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.90    sample                0.125           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.95    sample                0.125           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.99    sample                0.209           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.999   sample              139.008           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.9999  sample              935.936           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p1.00    sample            20709.376           us/op
RateLimiterBenchmark.refillPermission                               ss        20     14.700 ± 4.003   us/op

As we can see we have the modes listed.
Count is the number of iterations. Apart from throughput where we measure the operations by time, the rest is time per operation.
Throughput,Average and Single shot are straightforward, Sample lists the percentiles. Error is the margin of error.

That’s it! Happy benchmarking

Gradle: Push to Maven Repository

If you are a developer sharing your artifacts is a common task, that needs to be in place from the start.

In most teams and companies a Maven repository is already setup, this repository would be used mostly through CI/CD tasks enabling developers to distribute the generated artifacts.

In order to make the example possible we shall spin up a Nexus repository using Docker Compose.

First let’s create a default password and the directory containing the plugins Nexus will download. As of now the password is clear text, this will serve us for doing our first action. By resetting the admin password on nexus the file shall be removed, thus there won’t be a file with a clear text password.

mkdir nexus-data
cat a-test-password > admin.password

Then onwards to the Compose file:

services:
  nexus:
    image: sonatype/nexus3
    ports:
      - 8081:8081
    environment:
      INSTALL4J_ADD_VM_PARAMS: "-Xms2703m -Xmx2703m -XX:MaxDirectMemorySize=2703m"
    volumes:
      - nexus-data:/nexus-data
      - ./admin.password:/nexus-data/admin.password
volumes:
  nexus-data:

Let’s examine the file.
By using INSTALL4J_ADD_VM_PARAMS we override the Java command that will run the Nexus server, this way we can instruct to use more memory.
Because nexus takes too long on initialization we shall create a Docker volume. Everytime we run the compose file, the initialization will not happen from start, instead will use the results of the previous initialization. By mounting the admin password created previously we have a predefined password for the service.

By issuing the following command, the server will be up and running:

 
docker compose up

You can find more on Compose on the Developers Essential Guide to Docker Compose.

After some time nexus will have been initialized and running, therefore we shall proceed to our Gradle configuration.

We will keep track of the version on the gradle.properties file

version 1.0-SNAPSHOT

The build.gradle file:

plugins {
    id 'java'
    id 'maven-publish'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
    useJUnitPlatform()
}

publishing {
    publications {
        mavenJava(MavenPublication) {
            groupId 'org.example'
            artifactId 'gradle-push'
            version version
            from components.java
            versionMapping {
                usage('java-api') {
                    fromResolutionOf('runtimeClasspath')
                }
                usage('java-runtime') {
                    fromResolutionResult()
                }
            }
            pom {
                name = 'gradle-push'
                description = 'Gradle Push to Nexus'
                url = 'https://github.com/gkatzioura/egkatzioura.wordpress.com.git'
                licenses {
                    license {
                        name = 'The Apache License, Version 2.0'
                        url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
                    }
                }
                developers {
                    developer {
                        id = 'John'
                        name = 'John Doe'
                        email = 'an-email@gmail.com'
                    }
                }
                scm {
                    connection = 'scm:git:https://github.com/gkatzioura/egkatzioura.wordpress.com.git'
                    developerConnection = 'scm:git:https://github.com/gkatzioura/egkatzioura.wordpress.com.git'
                    url = 'https://github.com/gkatzioura/egkatzioura.wordpress.com.git'
                }
            }
        }
    }

    repositories {
        maven {
            def releasesRepoUrl = "http://localhost:8081/repository/maven-releases/"
            def snapshotsRepoUrl = "http://localhost:8081/repository/maven-snapshots/"
            url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl

            credentials {
                username "admin"
                password "a-test-password"
            }
        }
    }
}

The plugin to be used is the maven-publish plugin. If we examine the file, we identify that the plugin generates a pom maven file which shall be used in order to execute the deploy command to Nexus. The repositories are configured on the repositories section including the nexus users we defined previously. Whether the version is a SNAPSHOT or a release the corresponding repository endpoint will be picked. As we see clear text password is used. On the next blog we will examine how we can avoid that.

Now it is sufficient to run

gradle publish

This will deploy the binary to the snapshot repository.

You can find the source code on GitHub.

refill-rate-limiter: A Resilience4j-based rate limiter

It’s been a while since I’ve been working on this project. It is a rate limiter based on the atomic rate limiter from Resilience4j. The key difference is that the atomic rate limiter gets the permissions assigned per cycle while the refill-rate-limiter will split the permissions in a cycle and gradually refill them. In an essence it simulates a token bucket algorithm however it uses the atomic rate limiter internals

Using it is very simple.
Import it using maven.

	<dependencies>
        ...
		<dependency>
			<groupId>io.github.gkatzioura</groupId>
			<artifactId>refill-rate-limiter</artifactId>
			<version>1.0</version>
		</dependency>
		<dependency>
			<groupId>io.github.resilience4j</groupId>
			<artifactId>resilience4j-ratelimiter</artifactId>
			<version>1.7.1</version>
		</dependency>
        ...
	</dependencies>

And use it in your code base.

...
        RefillRateLimiterConfig refillRateLimiterConfig = new RefillRateLimiterConfig.Builder()
                .limitRefreshPeriod(Duration.of(2, ChronoUnit.SECONDS))
                .limitForPeriod(1)
                .permitCapacity(1)
                .build();

        refillRateLimiter = new RefillRateLimiter("default", refillRateLimiterConfig, io.vavr.collection.HashMap.empty());
...

        boolean allowed = refillRateLimiter.acquirePermission(1);

Just like Resilience4j it also has Apache Licence V2.
It spawned from an pr on the Resilience4j, since the merge time took longer I made it available as a standalone implementation. It would not hurt if you thumps up on pr so it ends up to the place it belongs 😉

A description of the algorithm can be found on GitHub. Benchmarks and integrations will follow.

New Book Day: A Developer’s Essential Guide to Docker Compose

As Developers nowadays we have a wide variety of Software components and Cloud Services to use. This was a scenario that we could not even imagine in the past.
I still remember when we had to setup our Application Servers and Databases on top of Bare metal servers.
This burst of computing functionality in the form of the Cloud and managed services, allowed us to be able to utilise more tools for our applications and build better products.
Issues like orchestrating workloads, isolating and shipping them went to a whole different level.
As a result containerization came to the rescue.
Docker took over the microservice world and became the dominant solution to deploy microservices and in certain cases even to deploy Databases and Brokers.
This brings us to the development process. Production deployments is a huge chapter on its own. Platform engineers needs to take care of the security, the scaling and robustness of container based deployments.
But the development process is also affected:

  • SQL/NoSQL Databases as well as purpose-built databases like InfluxDB need to be available locally
  • Scenarios of Microservices applications need to be tested
  • Other Components such as brokers need to be available for testing

A step forward to the challenges mentioned above is to utilise Docker and its rich functionality. As convenient as it is to spin up containers locally you still end up managing containers, volumes and networks. Most of the times those have been spinned up add-hoc or with some scripts that a team has to maintain.

Docker Compose is one of the solutions to the problems described. With Compose you can spin up multiple containers locally organised using yaml files.
Here are some of the benefits when used during development:

  • The containers are organised and can spin up or shut down in an organized way
  • Applications can be placed on different networks
  • Volumes can be managed and attached to containers in a managed way
  • Containers resolve each other’s location through a DNS automatically, manual linking is not needed.

I have been using Compose for years. It helped me to make my development process much more efficient. Also in certain cases it helped me on actual production deployments. Writing this book was an opportunity for me to advocate for Docker Compose.

 

 

Thanks to the amazing people at Packt publishing it was possible to write this book and give back to the community.

The book is focused on various aspects of Compose.

In the beginning it will be an extensive look on Docker Compose, how it is implemented, how it interacts with the Docker engine, the available commands as well as the functionality it provides.

Onwards we will dive deep in day to day development using Compose. We will spin up complex infrastructure locally, as well as simulate Microservice environments using Compose. We will take this concept further and incrementally simulate things that we have to deal with a production deployment, as well as issue workarounds. Lastly we will use Compose for CI/CD jobs on popular solutions like Github Actions, Travis, and BitBucket Pipelines.

The last part of the book is all about deploying to production. All the knowledge acquired previously can be used for actual production deployments. A production deployment comes with some standards:

  • Infrastructure as Code
  • Container registry
  • Networks
  • Load Balancing
  • Autoscaling

The above, in combination with the knowledge we accumulated so far using Compose, will be used for a deployment on AWS and Azure, the most popular Cloud providers. This will be done also with the extra help of Infrastructure as Code by using Terraform.

Lastly since many production deployments nowadays reside on Kubernetes we shall build a bridge between Compose and Kubernetes by migrating the existing Compose deployment using Kompose.

You can find the book on Amazon  as well as on the Packt portal.
Happy Learning!

Debezium Server with PostgreSQL and Redis Stream

Debezium is a great tool for capturing the row level changes that happen on a Database and stream those changes to a broker of our choice.

Since this functionality stays in the boundaries of a Database, it helps on keeping our applications simple. For example there in no need for an application to emit events on any database interactions. Debezium will monitor the row changes and will emit the events. Based on the broker solution used with Debezium a consumer can subscribe to the broker thus receive the changes.

PostgreSQL being a popular SQL database, it is supported by Debezium.

Our goal would be to listen to PostgreSQL changes and stream them to a Redis stream through a Debezium Server. It is common to use Debizum with Kafka, in case where Kafka is not present in a team’s Tech stack we can use other brokers.

In our case we would keep things lightweight by using Redis Streams.

Redis will be setup without any extra configurations.

In order to use PostgreSQL with Debezium it is essentials to alter the configuration on postgreSQL.

The configuration we shall use on postgreSQL will be the following

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3

As we can see we use the logical_decoding from PostgreSQL.
From the documentation:

Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.

In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.

We will also create a namespace and a table for PostgreSQL. The namespace and the table will be the ones to listen for changes.

#!/bin/bash
set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
  create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );
EOSQL

This is the table we used in a previous PostgreSQL example.

Debezium will have to be able to interact with the PostgreSQL server as well as the the redis server.
The configuration should be the following.

debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=test_schema
debezium.source.plugin.name=pgoutput

By examining the configuration we can see that we have the necessary information for Debezium to communicate to the PostgreSQL database, also we specify the schema that we created previously. Therefore only changes from that schema will be streamed. We can also make things more restrictive for example whitelisting tables.

Since this demo will involve three different software Components docker compose will come in handy.

version: '3.1'

services:
  redis:
    image: redis
    ports:
      - 6379:6379
    depends_on:
      - postgres
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432
  debezium:
    image: debezium/server
    volumes:
      - ./conf:/debezium/conf
      - ./data:/debezium/data
    depends_on:
      - redis

By using Compose we were able to spin up three different software components on the same network. This helps the components to interact with each other by using the dns names of the services as specified on Compose. Also the configuration files we created previously are mounted to the Docker containers. Docker Compose V2 is out there with many good features, you can find more about it on the book I authored
A Developer’s Essential Guide to Docker Compose
.

In order to get the stack running we shall execute the following command

$ docker compose up

Since it is up and running, we can now start listening for events.

We shall login to Redis and start listen for any possible database updates.

$ docker exec -it debezium-example-redis-1 redis-cli
> xread block 1000000 streams tutorial.test_schema.employee $

This will make it possible to block until we receive one event from the stream.
If we examine the stream name we should see the pattern of {server-name}.{schema}.{table}. This would allow consumers to subscribe only to the changes of interest.

Onwards we will make an entry.

$ docker exec -it debezium-example-postgres-1 psql postgres postgres
> insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);
> \q

If we check the redis session we should see that we received an event from the Redis stream

127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $
1) 1) "tutorial.test_schema.employee"
   2) 1) 1) "1663796657336-0"
         2) 1) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}"
            2) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"john1@doe.com\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\"1.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"24289128\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}"
(10.17s)
127.0.0.1:6379> 

How cool is that? We can now start streaming our databases changes to the broker of our choice.

You can find the source code on GitHub.

Mock GRPC Services for Unit Testing

On our day to day work we develop applications that include interactions with software components through I/O. Can be a database, a broker or some form of blob storage. Take for example the Cloud Components you interact with: Azure Storage Queue, SQS, Pub/Sub. The communication with those components usually happens with an SDK.

From the start testing will kick in, thus the interaction with those components should be tackled in a testing context. An approach is to use installations (or simulators) of those components and have the code interacting with an actual instance, just like the way it can be achieved by using test containers or by creating infrastructure for testing purposes only.
Another approach is to spin up a mock service of the components and have the tests interacting with it. A good example of this can be Hoverfly. A simulated http service is run during testing and test cases interact with it.
Both can be used on various situations depending on the qualities our testing process requires. We shall focus on the second approach applied on gRPC.

It is well known that most Google Cloud Components come with a gRPC api.
In our scenario we have an application publishing messages to Pub/Sub.

Let’s put the needed dependencies first

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>libraries-bom</artifactId>
                <version>24.1.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-pubsub</artifactId>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-testing</artifactId>
            <version>1.43.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.google.api.grpc</groupId>
            <artifactId>grpc-google-cloud-pubsub-v1</artifactId>
            <version>1.97.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

Let’s start with our publisher class.

package com.egkatzioura.notification.publisher;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;

public class UpdatePublisher {

    private final Publisher publisher;
    private final Executor executor;

    public UpdatePublisher(Publisher publisher, Executor executor) {
        this.publisher = publisher;
        this.executor = executor;
    }

    public CompletableFuture<String> update(String notification) {
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                                                           .setData(ByteString.copyFromUtf8(notification))
                                                                   .build();
        ApiFuture<String> apiFuture = publisher.publish(pubsubMessage);

        return toCompletableFuture(apiFuture);
    }

    private CompletableFuture<String> toCompletableFuture(ApiFuture<String> apiFuture) {
        final CompletableFuture<String> responseFuture = new CompletableFuture<>();

        ApiFutures.addCallback(apiFuture, new ApiFutureCallback<>() {
            @Override
            public void onFailure(Throwable t) {
                responseFuture.completeExceptionally(t);
            }

            @Override
            public void onSuccess(String result) {
                responseFuture.complete(result);
            }

        }, executor);
        return responseFuture;
    }

}

The publisher will send messages and return the CompletableFuture of the message Id sent.
So let’s test this class. Our goal is to sent a message and get the message id back. The service to mock and simulate is PubSub.
For this purpose we added the grpc api dependency on maven

        <dependency>
            <groupId>com.google.api.grpc</groupId>
            <artifactId>grpc-google-cloud-pubsub-v1</artifactId>
            <version>1.97.1</version>
            <scope>test</scope>
        </dependency>

We shall mock the api for publishing actions. The class to implement is PublisherGrpc.PublisherImplBase.

package com.egkatzioura.notification.publisher;

import java.util.UUID;

import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;

import io.grpc.stub.StreamObserver;

public class MockPublisherGrpc extends PublisherGrpc.PublisherImplBase {

    private final String prefix;

    public MockPublisherGrpc(String prefix) {
        this.prefix = prefix;
    }

    @Override
    public void publish(PublishRequest request, StreamObserver<PublishResponse> responseObserver) {
        responseObserver.onNext(PublishResponse.newBuilder().addMessageIds(prefix+":"+UUID.randomUUID().toString()).build());
        responseObserver.onCompleted();
    }

}

As you see the message id will have a prefix we define.

This would be the PublisherGrpc implementation on the server side. Let us proceed to our unit test. The UpdatePublisher class can have a Publisher injected. This publisher will be configured to use the PublisherGrpc.PublisherImplBase created previously.

@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();

private static final String MESSAGE_ID_PREFIX = "message";

@Before
public void setUp() throws Exception {
String serverName = InProcessServerBuilder.generateName();

Server server = InProcessServerBuilder
.forName(serverName).directExecutor().addService(new MockPublisherGrpc(MESSAGE_ID_PREFIX)).build().start();

grpcCleanup.register(server);
...

Above we created a GRPC server that services in-process requests. Then we registered the mock service created previously.
Onwards we create the Publisher using that service and create an instance of the class to test.

...

private UpdatePublisher updatePublisher;

@Before
public void setUp() throws Exception {
String serverName = InProcessServerBuilder.generateName();

Server server = InProcessServerBuilder
.forName(serverName).directExecutor().addService(new MockPublisherGrpc(MESSAGE_ID_PREFIX)).build().start();

grpcCleanup.register(server);

ExecutorProvider executorProvider = testExecutorProvider();
ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName).directExecutor().build();

TransportChannel transportChannel = GrpcTransportChannel.create(managedChannel);
TransportChannelProvider transportChannelProvider = FixedTransportChannelProvider.create(transportChannel);

String topicName = "projects/test-project/topic/my-topic";
Publisher publisher = Publisher.newBuilder(topicName)
.setExecutorProvider(executorProvider)
.setChannelProvider(transportChannelProvider)
.build();

updatePublisher = new UpdatePublisher(publisher, Executors.newSingleThreadExecutor());
...

We pass a Channel to our publisher which points to our InProcessServer. Requests will be routed to the service we registered. Finally we can add our test.

@Test
public void testPublishOrder() throws ExecutionException, InterruptedException {
String messageId = updatePublisher.update("Some notification").get();
assertThat(messageId, containsString(MESSAGE_ID_PREFIX));
}

We did it! We created our in process gRPC Server in order to have tests for our gRPC driven services!

You can find the code on GitHub!

Add Grpc to your Spring Application

On the previous example we had a Java application spinning up an http server and upon this Java process operating a GRPC application.

If you use frameworks like Spring you might wonder how you can achieve a Grpc and Spring integration.
There are libraries out there that do so, we shall use the grpc-spring-boot-starter from io.github.lognet.
We shall start with the dependencies. We do need to import the gRPC generating plugins we used on the previous example.

    <dependencies>
        <dependency>
            <groupId>io.github.lognet</groupId>
            <artifactId>grpc-spring-boot-starter</artifactId>
            <version4.5.8</version>
        </dependency>
    </dependencies>


    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.6.2</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.17.2:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.39.0:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

What happens behind the scenes.

  • Spring environment spins up
  • gRPC Server starts
  • Spring services annotated with @GRpcService are picked up and registered to the gRPC server
  • Security and other filtering based components are integrated with the equivalent gRPC ServerInterceptor.

So pretty much we expect that instead of controllers we shall have GRpcServices and ServerInterceptors for filters.

Let’s add the proto files. We shall use the same proto of the previous example.

The location is src/main/proto/Order.proto and the contents would be

syntax = "proto3";
 
option java_multiple_files = true;
option java_package = "com.egkatzioura.order.v1";
 
service OrderService {
    rpc ExecuteOrder(OrderRequest) returns (OrderResponse) {};
}
 
message OrderRequest {
    string email = 1;
    string product = 2;
    int32 amount = 3;
}
 
message OrderResponse {
    string info = 1;
}

As expected an mvn clean install will generate the gRPC classes. Now we should create the spring service.

package com.gkatzioura.order.impl;

import com.egkatzioura.order.v1.OrderRequest;
import com.egkatzioura.order.v1.OrderResponse;
import com.egkatzioura.order.v1.OrderServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.lognet.springboot.grpc.GRpcService;

@GRpcService
public class OrderServiceImpl extends OrderServiceGrpc.OrderServiceImplBase{

    @Override
    public void executeOrder(OrderRequest request, StreamObserver<OrderResponse> responseObserver) {
        OrderResponse response = OrderResponse.newBuilder()
                .setInfo("Hi "+request.getEmail()+", you order has been executed")
                .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

}

Also let’s add the main class

package com.gkatzioura.order;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
}

The Spring context is spun up, and the @GRpcService annotated services kick off.
By default the port would be 6565

Let’s run the same client that we run on the previous example.

package com.gkatzioura.order;

import com.egkatzioura.order.v1.Order;
import com.egkatzioura.order.v1.OrderRequest;
import com.egkatzioura.order.v1.OrderResponse;
import com.egkatzioura.order.v1.OrderServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class ApplicationClient {
    public static void main(String[] args) {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext()
                .build();

        OrderServiceGrpc.OrderServiceBlockingStub orderServiceBlockingStub
                = OrderServiceGrpc.newBlockingStub(managedChannel);

        OrderRequest orderRequest = OrderRequest.newBuilder()
                .setEmail("hello@word.com")
                .setProduct("no-name")
                .setAmount(3)
                .build();

        OrderResponse orderResponse = orderServiceBlockingStub.executeOrder(orderRequest);

        System.out.println("Received response: "+orderResponse.getInfo());

        managedChannel.shutdown();
    }
}

The response is the one expected. We did connect to the server and got back a response. We did not have to manually register the services to the gRPC server, since spring did this one for us. You can find the code on github.

Execute mTLS calls using Java

Previously we secured an Nginx instance using SSL and mTLS. If you are using Java interacting with a service secured with mTLS requires some changes on your code base. On this tutorial we shall enable our Java application to use mTLS using different clients.

To get started fast, we shall spin up a server exactly the same way we did on the mTLS blog. This will make things streamlined and the client credentials would be in place.

In order to make ssl configurations to our Java clients we need to setup first an SSLContext. This simplifies things since that SSLContext can be use for various http clients that are out there.

Since we have the client public and private keys, we need to convert the private key from PEM format to DER.

openssl pkcs8 -topk8 -inform PEM -outform PEM -in /path/to/generated/client.key -out /path/to/generated/client.key.pkcs8 -nocrypt

By using a local Nginx service for this example, we need to disable the hostname verification.

        final Properties props = System.getProperties();
        props.setProperty("jdk.internal.httpclient.disableHostnameVerification", Boolean.TRUE.toString());

In other clients this might need a HostVerifier to be setup that accepts all connections.

        HostnameVerifier allHostsValid = new HostnameVerifier() {
            public boolean verify(String hostname, SSLSession session) {
                return true;
            }
        };

Next step is to load the client keys into java code and create a KeyManagerFactory.

        String privateKeyPath = "/path/to/generated/client.key.pkcs8";
        String publicKeyPath = "/path/to/generated/client.crt";

        final byte[] publicData = Files.readAllBytes(Path.of(publicKeyPath));
        final byte[] privateData = Files.readAllBytes(Path.of(privateKeyPath));

        String privateString = new String(privateData, Charset.defaultCharset())
                .replace("-----BEGIN PRIVATE KEY-----", "")
                .replaceAll(System.lineSeparator(), "")
                .replace("-----END PRIVATE KEY-----", "");

        byte[] encoded = Base64.getDecoder().decode(privateString);

        final CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
        final Collection<? extends Certificate> chain = certificateFactory.generateCertificates(
                new ByteArrayInputStream(publicData));

        Key key = KeyFactory.getInstance("RSA").generatePrivate(new PKCS8EncodedKeySpec(encoded));

        KeyStore clientKeyStore = KeyStore.getInstance("jks");
        final char[] pwdChars = "test".toCharArray();
        clientKeyStore.load(null, null);
        clientKeyStore.setKeyEntry("test", key, pwdChars, chain.toArray(new Certificate[0]));

        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
        keyManagerFactory.init(clientKeyStore, pwdChars);

On the above snippet

  • We read the bytes from the files.
  • We created a certificate chain from the public key.
  • We created a key instance using the private key.
  • Created a Keystore using the chain and keys
  • Created a KeyManagerFactory

Now that we have a KeyManagerFactory created we can use it to create an SSLContext

Due to using self signed certificates we need to use a TrustManager that will accept them. On this example the Trust Manager will accept all certificates presented from the server.

TrustManager[] acceptAllTrustManager = {
                new X509TrustManager() {
                    public X509Certificate[] getAcceptedIssuers() {
                        return new X509Certificate[0];
                    }

                    public void checkClientTrusted(
                            X509Certificate[] certs, String authType) {
                    }

                    public void checkServerTrusted(
                            X509Certificate[] certs, String authType) {
                    }
                }
        };

Then the ssl context initialization.

        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(keyManagerFactory.getKeyManagers(), acceptAllTrustManager, new java.security.SecureRandom());

Let’s use a client and see how it behaves

 HttpClient client = HttpClient.newBuilder()
                                      .sslContext(sslContext)
                                      .build();



        HttpRequest exactRequest = HttpRequest.newBuilder()
                                      .uri(URI.create("https://127.0.0.1"))
                                      .GET()
                                      .build();

        var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
                                  .join();
        System.out.println(exactResponse.statusCode());

We shall receive an 404 code (default for that Nginx installation )which means that our request had a successful mTLS handshake.

Now let’s try with another client, the old school synchronous HttpsURLConnection. Pay attention: I use the allHostsValid created previously.

        HttpsURLConnection httpsURLConnection = (HttpsURLConnection)   new URL("https://127.0.0.1").openConnection();
        httpsURLConnection.setSSLSocketFactory(sslContext.getSocketFactory());
        httpsURLConnection.setHostnameVerifier(allHostsValid);

        InputStream  inputStream = httpsURLConnection.getInputStream();
        String result =  new String(inputStream.readAllBytes(), Charset.defaultCharset());

This will throw a 404 error which means that the handshake took place successfully.

So wether you have an async http client or a synchronous one, provided you have the right SSLContext configured you should be able to do the handshake.

Executing Blocking calls on a Reactor based Application

Project Reactor is a fully non-blocking foundation with back-pressure support included. Although most libraries out there support asynchronous methods thus assist on its usage, there are some cases where a library contains complex blocking methods without an asynchronous implementation. Calling this methods inside a reactor stream would have bad results. Instead we need to make those method to async ones or find if there is a workaround.

Provided you might be short on time and is not possible to contribute a patch to the tool used, or you cannot identify how to reverse engineer the blocking call and implement a non blocking version, then it makes sense to utilise some threads.

First let’s import the dependencies for our project

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.11</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

Let’s start with out blocking service

    public String get(String url) throws IOException {
        HttpURLConnection connection = (HttpsURLConnection) new URL(url).openConnection();
        connection.setRequestMethod("GET");
        connection.setDoOutput(true);
        try(InputStream inputStream = connection.getInputStream()) {
            return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
        }
    }

We used HttpsURLConnection since we know for sure that it is a blocking call. To do so we need a Scheduler. For the blocking calls we shall use the boundedElastic scheduler. A scheduler can also be created by an existing executor service.

So let’s transform this method to a non-blocking one.

package com.gkatzioura.blocking;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class BlockingAsyncService {

    private final BlockingService blockingService;

    public BlockingAsyncService(BlockingService blockingService) {
        this.blockingService = blockingService;
    }

    private Mono<String> get(String url) {
        return Mono.fromCallable(() -> blockingService.get(url))
                .subscribeOn(Schedulers.boundedElastic());
    }

}

What we can see is a Mono created from the callable method. A scheduler subscribes to this mono and thus will receive the event emitted, which shall be scheduled for execution.

Let’s have a test

package com.gkatzioura.blocking;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class BlockingAsyncServiceTest {

    private BlockingAsyncService blockingAsyncService;

    @BeforeEach
    void setUp() {
        blockingAsyncService = new BlockingAsyncService(new BlockingService());
    }

    @Test
    void name() {
        StepVerifier.create(
                            Mono.just("https://www.google.com/")
                                .map(s -> blockingAsyncService.get(s))
                                .flatMap(s -> s)
                    )
                .consumeNextWith(s -> s.startsWith("<!doctype"))
                .verifyComplete();
    }
}

That’s it! Obviously the best thing to do is to find a way to make this blocking call into an async call and try to find a workaround using the async libraries out there. When it’s not feasible we can fallback on using Threads.