Execute mTLS calls using Java

Previously we secured an Nginx instance using SSL and mTLS. If you are using Java interacting with a service secured with mTLS requires some changes on your code base. On this tutorial we shall enable our Java application to use mTLS using different clients.

To get started fast, we shall spin up a server exactly the same way we did on the mTLS blog. This will make things streamlined and the client credentials would be in place.

In order to make ssl configurations to our Java clients we need to setup first an SSLContext. This simplifies things since that SSLContext can be use for various http clients that are out there.

Since we have the client public and private keys, we need to convert the private key from PEM format to DER.

openssl pkcs8 -topk8 -inform PEM -outform PEM -in /path/to/generated/client.key -out /path/to/generated/client.key.pkcs8 -nocrypt

By using a local Nginx service for this example, we need to disable the hostname verification.

        final Properties props = System.getProperties();
        props.setProperty("jdk.internal.httpclient.disableHostnameVerification", Boolean.TRUE.toString());

In other clients this might need a HostVerifier to be setup that accepts all connections.

        HostnameVerifier allHostsValid = new HostnameVerifier() {
            public boolean verify(String hostname, SSLSession session) {
                return true;
            }
        };

Next step is to load the client keys into java code and create a KeyManagerFactory.

        String privateKeyPath = "/path/to/generated/client.key.pkcs8";
        String publicKeyPath = "/path/to/generated/client.crt";

        final byte[] publicData = Files.readAllBytes(Path.of(publicKeyPath));
        final byte[] privateData = Files.readAllBytes(Path.of(privateKeyPath));

        String privateString = new String(privateData, Charset.defaultCharset())
                .replace("-----BEGIN PRIVATE KEY-----", "")
                .replaceAll(System.lineSeparator(), "")
                .replace("-----END PRIVATE KEY-----", "");

        byte[] encoded = Base64.getDecoder().decode(privateString);

        final CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
        final Collection<? extends Certificate> chain = certificateFactory.generateCertificates(
                new ByteArrayInputStream(publicData));

        Key key = KeyFactory.getInstance("RSA").generatePrivate(new PKCS8EncodedKeySpec(encoded));

        KeyStore clientKeyStore = KeyStore.getInstance("jks");
        final char[] pwdChars = "test".toCharArray();
        clientKeyStore.load(null, null);
        clientKeyStore.setKeyEntry("test", key, pwdChars, chain.toArray(new Certificate[0]));

        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
        keyManagerFactory.init(clientKeyStore, pwdChars);

On the above snippet

  • We read the bytes from the files.
  • We created a certificate chain from the public key.
  • We created a key instance using the private key.
  • Created a Keystore using the chain and keys
  • Created a KeyManagerFactory

Now that we have a KeyManagerFactory created we can use it to create an SSLContext

Due to using self signed certificates we need to use a TrustManager that will accept them. On this example the Trust Manager will accept all certificates presented from the server.

TrustManager[] acceptAllTrustManager = {
                new X509TrustManager() {
                    public X509Certificate[] getAcceptedIssuers() {
                        return new X509Certificate[0];
                    }

                    public void checkClientTrusted(
                            X509Certificate[] certs, String authType) {
                    }

                    public void checkServerTrusted(
                            X509Certificate[] certs, String authType) {
                    }
                }
        };

Then the ssl context initialization.

        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(keyManagerFactory.getKeyManagers(), acceptAllTrustManager, new java.security.SecureRandom());

Let’s use a client and see how it behaves

 HttpClient client = HttpClient.newBuilder()
                                      .sslContext(sslContext)
                                      .build();



        HttpRequest exactRequest = HttpRequest.newBuilder()
                                      .uri(URI.create("https://127.0.0.1"))
                                      .GET()
                                      .build();

        var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
                                  .join();
        System.out.println(exactResponse.statusCode());

We shall receive an 404 code (default for that Nginx installation )which means that our request had a successful mTLS handshake.

Now let’s try with another client, the old school synchronous HttpsURLConnection. Pay attention: I use the allHostsValid created previously.

        HttpsURLConnection httpsURLConnection = (HttpsURLConnection)   new URL("https://127.0.0.1").openConnection();
        httpsURLConnection.setSSLSocketFactory(sslContext.getSocketFactory());
        httpsURLConnection.setHostnameVerifier(allHostsValid);

        InputStream  inputStream = httpsURLConnection.getInputStream();
        String result =  new String(inputStream.readAllBytes(), Charset.defaultCharset());

This will throw a 404 error which means that the handshake took place successfully.

So wether you have an async http client or a synchronous one, provided you have the right SSLContext configured you should be able to do the handshake.

Executing Blocking calls on a Reactor based Application

Project Reactor is a fully non-blocking foundation with back-pressure support included. Although most libraries out there support asynchronous methods thus assist on its usage, there are some cases where a library contains complex blocking methods without an asynchronous implementation. Calling this methods inside a reactor stream would have bad results. Instead we need to make those method to async ones or find if there is a workaround.

Provided you might be short on time and is not possible to contribute a patch to the tool used, or you cannot identify how to reverse engineer the blocking call and implement a non blocking version, then it makes sense to utilise some threads.

First let’s import the dependencies for our project

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.11</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

Let’s start with out blocking service

    public String get(String url) throws IOException {
        HttpURLConnection connection = (HttpsURLConnection) new URL(url).openConnection();
        connection.setRequestMethod("GET");
        connection.setDoOutput(true);
        try(InputStream inputStream = connection.getInputStream()) {
            return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
        }
    }

We used HttpsURLConnection since we know for sure that it is a blocking call. To do so we need a Scheduler. For the blocking calls we shall use the boundedElastic scheduler. A scheduler can also be created by an existing executor service.

So let’s transform this method to a non-blocking one.

package com.gkatzioura.blocking;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class BlockingAsyncService {

    private final BlockingService blockingService;

    public BlockingAsyncService(BlockingService blockingService) {
        this.blockingService = blockingService;
    }

    private Mono<String> get(String url) {
        return Mono.fromCallable(() -> blockingService.get(url))
                .subscribeOn(Schedulers.boundedElastic());
    }

}

What we can see is a Mono created from the callable method. A scheduler subscribes to this mono and thus will receive the event emitted, which shall be scheduled for execution.

Let’s have a test

package com.gkatzioura.blocking;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class BlockingAsyncServiceTest {

    private BlockingAsyncService blockingAsyncService;

    @BeforeEach
    void setUp() {
        blockingAsyncService = new BlockingAsyncService(new BlockingService());
    }

    @Test
    void name() {
        StepVerifier.create(
                            Mono.just("https://www.google.com/")
                                .map(s -> blockingAsyncService.get(s))
                                .flatMap(s -> s)
                    )
                .consumeNextWith(s -> s.startsWith("<!doctype"))
                .verifyComplete();
    }
}

That’s it! Obviously the best thing to do is to find a way to make this blocking call into an async call and try to find a workaround using the async libraries out there. When it’s not feasible we can fallback on using Threads.

Add mTLS to Nginx

Previously we added ssl to an Nginx server. On this example we shall enhance our security by adding mTLS to Nginx.

Apart from encrypting the traffic between client and server, SSL is also a way for the client to make sure that the server connecting to, is a trusted source.

On the other hand mTLS is a way for the server to ensure that the client is a trusted one. The client does accept the SSL connection to the server however it has to present to the server a certificate signed from an authority that the Server accepts. This way the Server, by validating the certificate the client presents can allow the connection.

More or less we shall build upon the previous example. The ssl certificates shall be the same, however we shall add the configuration for mtls.

The server ssl creation.

mkdir certs

cd certs

openssl genrsa -des3 -out ca.key 4096
#Remove passphrase for example purposes
openssl rsa -in ca.key -out ca.key
openssl req -new -x509 -days 3650 -key ca.key -subj "/CN=*.your.hostname" -out ca.crt

printf test > passphrase.txt
openssl genrsa -des3 -passout file:passphrase.txt -out server.key 2048
openssl req -new -passin file:passphrase.txt -key server.key -subj "/CN=*.your.hostname" -out server.csr

openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

The above is sufficient to secure out Nginx with SSL. So let’s create the mTLS certificates for the clients.
In order to create a certificate for mTLS we need a certificate authority. For convenience the certificate authority will be the same as the one we generated on the previous example.

printf test > client_passphrase.txt
openssl genrsa -des3 -passout file:client_passphrase.txt -out client.key 2048
openssl rsa -passin file:client_passphrase.txt -in client.key -out client.key
openssl req -new -key client.key -subj "/CN=*.client.hostname" -out client.csr

##Sign the certificate with the certificate authority
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out client.crt

Take note that the client common name needs to be different from the server’s certs common name, or else your request will be reject.

So we have our client certificate generated.
The next step is to configure Nginx to force mTLS connections from a specific authority

server {
error_log /var/log/nginx/error.log debug;
    listen 443 ssl;
    server_name  test.your.hostname;
    ssl_password_file /etc/nginx/certs/password;
    ssl_certificate /etc/nginx/certs/tls.crt;
    ssl_certificate_key /etc/nginx/certs/tls.key;

    ssl_client_certificate /etc/nginx/mtls/ca.crt;
    ssl_verify_client on;
    ssl_verify_depth  3;

    ssl_protocols             TLSv1 TLSv1.1 TLSv1.2;

    location / {
    }

}

By using the ssl_client_certificate we point to the certificate authority that the client certificates should be signed from.
By using the ssl_verify_client as on, we enforce mTLS connections.

Since we have all certificates generated let’s spin up the Nginx server using docker.

docker run --rm --name mtls-nginx -p 443:443 -v $(pwd)/certs/ca.crt:/etc/nginx/mtls/ca.crt -v $(pwd)/certs/server.key:/etc/nginx/certs/tls.key -v $(pwd)/certs/server.crt:/etc/nginx/certs/tls.crt -v $(pwd)/nginx.mtls.conf:/etc/nginx/conf.d/nginx.conf -v $(pwd)/certs/passphrase.txt:/etc/nginx/certs/password nginx

Our server is up and running. Let’s try to do a request using curl without using any client certificates.

curl https://localhost/ --insecure

The result shall be

<html>
<head><title>400 No required SSL certificate was sent</title></head>
<body>
<center><h1>400 Bad Request</h1></center>
<center>No required SSL certificate was sent</center>
<hr><center>nginx/1.21.3</center>
</body>
</html>

As expected our request is rejected.
Let’s use the client certificates we generated from the expected certificate authority.

curl --key certs/client.key --cert certs/client.crt https://127.0.0.1 --insecure
<html>
<head><title>404 Not Found</title></head>
<body>
<center><h1>404 Not Found</h1></center>
<hr><center>nginx/1.21.3</center>
</body>
</html>

The connection was established and the client could connect to the Nginx instance.

Let’s put them all together

mkdir certs

cd certs

openssl genrsa -des3 -out ca.key 4096
#Remove passphrase for example purposes
openssl rsa -in ca.key -out ca.key
openssl req -new -x509 -days 3650 -key ca.key -subj "/CN=*.your.hostname" -out ca.crt

printf test > passphrase.txt
openssl genrsa -des3 -passout file:passphrase.txt -out server.key 2048
openssl req -new -passin file:passphrase.txt -key server.key -subj "/CN=*.your.hostname" -out server.csr

openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

printf test > client_passphrase.txt
openssl genrsa -des3 -passout file:client_passphrase.txt -out client.key 2048
openssl rsa -passin file:client_passphrase.txt -in client.key -out client.key
openssl req -new -key client.key -subj "/CN=*.client.hostname" -out client.csr

##Sign the certificate with the certificate authority
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out client.crt

cd ../

docker run --rm --name mtls-nginx -p 443:443 -v $(pwd)/certs/ca.crt:/etc/nginx/mtls/ca.crt -v $(pwd)/certs/server.key:/etc/nginx/certs/tls.key -v $(pwd)/certs/server.crt:/etc/nginx/certs/tls.crt -v $(pwd)/nginx.mtls.conf:/etc/nginx/conf.d/nginx.conf -v $(pwd)/certs/passphrase.txt:/etc/nginx/certs/password nginx

You can find the code on github

Add SSL to Nginx

Nginx is a versatile tool that has many usages, can be used as a reverse proxy, load balancer etc.

A common usage is to handle the SSL traffic in front of applications. Thus instead of handling SSL from your application layer you can have nginx in front.

In our example we shall generate the certificates and make Nginx do the tls termination. I will use self signed certificates for our example. The certificates will be self signed and have a CA authority which shall help us on another example. In a real world example the certificate authority is something external like Let’s Encrypt or GlobalSign. By creating our own certificate authority we will be able to simulate them

openssl genrsa -des3 -out ca.key 4096
#Remove passphrase for example purposes
openssl rsa -in ca.key -out ca.key
openssl req -new -x509 -days 3650 -key ca.key -subj "/CN=*.your.hostname" -out ca.crt

Now that we have a certificate authority lets create the server key and certificate. First step is to create the key.

printf test > passphrase.txt
openssl genrsa -des3 -passout file:passphrase.txt -out server.key 1024
openssl req -new -passin file:passphrase.txt -key server.key -subj "/CN=*.your.hostname" -out server.csr

The result is to have a private key and a certificate signing request (csr). The csr needs to be signed by a certificate authority. The certificate authority in our case would be the one we create previously.Take note that we did not remove the password from the server.key. It was done on purpose to display how to load on Nginx, if you don’t want to tackle it remove it as shown at the certificate authority example.

So let’s sign the csr.

openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

Now we are ready to install them on Nginx. We shall use docker on this one.
This is how the nginx configuration should. What we shall do is to mount the files we generated previously to our docker image.

server {

    listen 443 ssl;
    server_name  test.your.hostname;
    ssl_password_file /etc/nginx/certs/password
    ssl_certificate /etc/nginx/certs/tls.crt;
    ssl_certificate_key /etc/nginx/certs/tls.key;


    location / {

        error_log /var/log/front_end_errors.log;
    }

    location = /swagger.json {
        proxy_pass https://petstore.swagger.io/v2/swagger.json;
    }

}

Our docker command mounting the files.

docker run --rm --name some-nginx -p 443:443 -v $(pwd)/certs/server.key:/etc/nginx/certs/tls.key -v $(pwd)/certs/server.crt:/etc/nginx/certs/tls.crt -v $(pwd)/nginx.conf:/etc/nginx/conf.d/nginx.conf -v $(pwd)/certs/passphrase.txt:/etc/nginx/certs/password nginx

Since this is a self signed certificate it cannot be accessed by a browser without tweaks but we can use curl –insecure to inspect the results. On a trusted certificate authority this would not be the case.

curl https://localhost/ -v --insecure

Let’s put them all in a script

mkdir certs

cd certs

openssl genrsa -des3 -out ca.key 4096
#Remove passphrase for example purposes
openssl rsa -in ca.key -out ca.key
openssl req -new -x509 -days 3650 -key ca.key -subj "/CN=*.your.hostname" -out ca.crt

printf test > passphrase.txt
openssl genrsa -des3 -passout file:passphrase.txt -out server.key 2048
openssl req -new -passin file:passphrase.txt -key server.key -subj "/CN=*.your.hostname" -out server.csr

openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

cd ../

docker run --rm --name some-nginx -p 443:443 -v $(pwd)/certs/server.key:/etc/nginx/certs/tls.key -v $(pwd)/certs/server.crt:/etc/nginx/certs/tls.crt -v $(pwd)/nginx.conf:/etc/nginx/conf.d/nginx.conf -v $(pwd)/certs/passphrase.txt:/etc/nginx/certs/password nginx

You can find the code on github.

On the next blog we shall configure Nginx to support mTLS.

Kubernetes pod as a Bastion Host

In Cloud Native apps private networks, databases and services are a reality.

An infrastructure can be fully private and only a limited number of entry points can be available.

Obviously the more restricted the better.

Still there are cases where there has not been any infrastructure setup for the private services and ways to link towards them. however if there is access through Kubernetes, HAProxy can help.

HAProxy can accept a configuration file. Uploading that file as a configmap and then mount the configmap to a Kubernetes pod will be easy. Then the HAProxy Kubernetes pod will be able to spin up using that configuration and thus establish a proxy connection.

Let’s start with the ha-proxy configuration. The target would be a MySQL database with a private IP.

 
apiVersion: v1
data:
  haproxy.cfg: |-
    global
    defaults
        timeout client          30s
        timeout server          30s
        timeout connect         30s

    frontend frontend
        bind    0.0.0.0:3306
        default_backend backend

    backend backend
        mode                    tcp
        server upstream 10.0.1.7:3306
kind: ConfigMap
metadata:
  creationTimestamp: null
  name: mysql-haproxy-port-forward

On the upstream we just add the ip and the port of the db, on the frontend we specify the local port and address we shall use.

By doing the above we have a way to mount the config file to our Kubernetes pod.

Now let’s create the pod

 
apiVersion: v1
kind: Pod
metadata:
  creationTimestamp: null
  labels:
    run: mysql-forward-pod
  name: mysql-forward-pod
spec:
  containers:
    - command:
      - haproxy
      - -f
      - /usr/local/etc/haproxy/haproxy.cfg
      - -V
      image: haproxy:1.7-alpine
      name: mysql-forward-pod
      resources: {}
      volumeMounts:
        - mountPath: /usr/local/etc/haproxy/
          name: mysql-haproxy-port-forward
  dnsPolicy: ClusterFirst
  restartPolicy: Always
  volumes:
    - name: mysql-haproxy-port-forward
      configMap:
        name: mysql-haproxy-port-forward
status: {}

On the volume section we set the configmap as a volume. On the container section we mount the configmap to a path thus having access to the file.
We use a HAProxy image, and we provide the command to start HAProxy using the file we mounted before.

To test that it works, use a kubectl session that has port-forward permissions and do

 
kubectl port-forward  mysql-forward-pod 3306:3306

You shall be able to access mysql from your localhost.

Receive Pub/Sub messages to your Spring Application

Pub/Sub is a messaging solution provided by GCP

Before we dive into the actual configuration we need to be aware that Spring Cloud for GCP is now managed by the Google Cloud Team. Therefore the latest code can be found here.

Our application will receive messages from Pub/Sub and expose them using an endpoint.
Let’s go for the imports first

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gkatzioura</groupId>
    <artifactId>spring-cloud-pubsub-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>spring-cloud-gcp-dependencies</artifactId>
                <version>2.0.4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-pubsub</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-autoconfigure</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>
    </dependencies>

</project>

Quick note: with a few tweaks you can use the PubSub emulator available from the Google Cloud Team.

The first class will contain the Pub/Sub messages received. It will be a queue containing a limited number of messages.

package com.gkatzioura.pubsub.example;

import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

@Component
public class LatestUpdates {

    LinkedBlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);

    public void addUpdate(String update) {
        boundedQueue.add(update);
    }

    public String fetch() {
        return boundedQueue.poll();
    }

}

The Pub/Sub configuration will initiate the listener, plus shall use spring integration.

We define a message channel.

    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

Then add the inbound channel adapter The ack mode will be set to manual.

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
                new PubSubInboundChannelAdapter(pubSubTemplate, "your-subscription");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(String.class);
        return adapter;
    }

Then we add a listener method. The way acknowledgements are handled is up to the developer. If a exception occurs on that block it will be caught and send on an error stream. Therefore messages will continue to get pulled.

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload,
                                @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
        latestUpdates.addUpdate(message.getPubsubMessage().getData().toStringUtf8());
        message.ack();
    }

The entire Pub/Sub configuration

package com.gkatzioura.pubsub.example;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;

import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.AckMode;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;

@Configuration
public class PubSubConfiguration {

    private final LatestUpdates latestUpdates;

    public PubSubConfiguration(LatestUpdates latestUpdates) {
        this.latestUpdates = latestUpdates;
    }

    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
                new PubSubInboundChannelAdapter(pubSubTemplate, "your-subscription");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(String.class);
        return adapter;
    }

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload,
                                @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
        latestUpdates.addUpdate(message.getPubsubMessage().getData().toStringUtf8());
        message.ack();
    }

}

The controller will just pull from the internal Queue.

package com.gkatzioura.pubsub.example;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UpdatesController {

    private LatestUpdates latestUpdates;

    public UpdatesController(LatestUpdates latestUpdates) {
        this.latestUpdates = latestUpdates;
    }

    @GetMapping("/update")
    public String getLatestUpdate() {
        return latestUpdates.fetch();
    }

}

Next step is to define an application for Spring

package com.gkatzioura.pubsub.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ExampleApplication {


    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }

}

By running the application be aware that you need to have at least one env variable set

spring.cloud.gcp.pubsub.enabled=true

This will fallback to your Local GCP configuration and will identify your credentials as well as the project pointing at.

That’s it! To summarise, we achieved to pull messages from Pub/Sub and expose them on an endpoint.

Using R2DBC with a Reactor Application

Since Reactor has taken over the Java world it was inevitable the a reactive sql library would be there.
In this blog we shall use r2dbc with h2 and reactor.

We shall start with the dependencies needed.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
    </parent>

    <groupId>com.gkatzioura</groupId>
    <artifactId>r2dbc-reactor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

We imported spring data from r2dbc, the h2 r2dbc driver, the h2 binary as well as the test utils.

Supposing that this is our schema.
This schema is a postgresql schema.

create table order_request (
	id uuid NOT NULL constraint or_id_pk primary key,
	created_by varchar,
	created timestamp default now()              not null,
	updated timestamp default now()              not null
);

We shall add it later to test/resources/schema.sql for testing purposes.

Also let’s add a new model

package com.gkatzioura.r2dbc.model;

import java.time.LocalDateTime;
import java.util.UUID;

import org.springframework.data.annotation.Id;
import org.springframework.data.domain.Persistable;
import org.springframework.data.relational.core.mapping.Table;

@Table("order_request")
public class OrderRequest implements Persistable<UUID> {

    @Id
    private UUID id;
    private String createdBy;
    private LocalDateTime created;
    private LocalDateTime updated;

    public void setId(UUID id) {
        this.id = id;
    }

    public String getCreatedBy() {
        return createdBy;
    }

    public void setCreatedBy(String createdBy) {
        this.createdBy = createdBy;
    }

    public LocalDateTime getCreated() {
        return created;
    }

    public void setCreated(LocalDateTime created) {
        this.created = created;
    }

    public LocalDateTime getUpdated() {
        return updated;
    }

    public void setUpdated(LocalDateTime updated) {
        this.updated = updated;
    }

    @Override
    public UUID getId() {
        return id;
    }

    @Override
    public boolean isNew() {
        return created == null;
    }

}

Pay attention to isNew method. This way the repository can identify if the object should be persisted or updated.

Now onwards to our Repository

package com.gkatzioura.r2dbc.repository;

import java.util.UUID;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.gkatzioura.r2dbc.model.OrderRequest;

public interface OrderRepository extends ReactiveCrudRepository<OrderRequest, UUID> {
}

Let’s put some tests.

As mentioned the schema above will reside in test/resources/schema.sql

We shall add some configuration for the test h2 db. We need to make sure that h2 will pickup the postgresql interface.

package com.gkatzioura.r2dbc;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.r2dbc.connection.init.CompositeDatabasePopulator;
import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer;
import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator;

import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;

@Configuration
@EnableR2dbcRepositories
public class H2ConnectionConfiguration extends AbstractR2dbcConfiguration  {

    @Override
    public ConnectionFactory connectionFactory() {
        return new H2ConnectionFactory(
                io.r2dbc.h2.H2ConnectionConfiguration.builder()
                                                     .url("mem:testdb;MODE=PostgreSQL;DB_CLOSE_DELAY=-1;")
                                                     .build()
        );
    }

    @Bean
    public ConnectionFactoryInitializer initializer() {
        var initializer = new ConnectionFactoryInitializer();
        initializer.setConnectionFactory(connectionFactory());

        var databasePopulator = new CompositeDatabasePopulator();
        databasePopulator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("schema.sql")));
        initializer.setDatabasePopulator(databasePopulator);
        return initializer;
    }

}

With this configuration we create a H2 database simulating a Postgresql DB, we create the schemas as well as enable the creation of the R2DBC repositories.

Also let’s add a test.

package com.gkatzioura.r2dbc.repository;

import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import com.gkatzioura.r2dbc.H2ConnectionConfiguration;
import com.gkatzioura.r2dbc.model.OrderRequest;
import reactor.test.StepVerifier;

@ExtendWith({SpringExtension.class})
@Import({H2ConnectionConfiguration.class})
class OrderRepositoryTest {

    @Autowired
    private OrderRepository orderRepository;

    @Test
    void testSave() {
        UUID id = UUID.randomUUID();
        OrderRequest orderRequest = new OrderRequest();
        orderRequest.setId(id);
        orderRequest.setCreatedBy("test-user");

        var persisted = orderRepository.save(orderRequest)
                                       .map(a -> orderRepository.findById(a.getId()))
                                       .flatMap(a -> a.map(b -> b.getId()));

        StepVerifier.create(persisted).expectNext(id).verifyComplete();
    }
}

That’s it, you can find the code on github.

Add Grpc to your Java Application

Grpc is a high performance, open source universal RPC framework.
There are various benefits for using gRPC.

    • It simplifies development by providing client/server code
    • It supports multiple languages

It all starts with defining a .proto file, .proto files reside on src/main/proto file.

Be aware it is a good practise to keep proto files on a repo and have some schema versioning. This way developers from other teams could generate their sdks by referencing them, even for other languages.

We shall create an Order Service on src/main/proto/Order.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.egkatzioura.order.v1";

service OrderService {
    rpc ExecuteOrder(OrderRequest) returns (OrderResponse) {};
}

message OrderRequest {
    string email = 1;
    string product = 2;
    int32 amount = 3;
}

message OrderResponse {
    string info = 1;
}

In order to work with grpc the following binaries need to be placed

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty-shaded</artifactId>
            <version>1.39.0</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.39.0</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>1.39.0</version>
        </dependency>
        <dependency> <!-- necessary for Java 9+ -->
            <groupId>org.apache.tomcat</groupId>
            <artifactId>annotations-api</artifactId>
            <version>6.0.53</version>
            <scope>provided</scope>
        </dependency>
    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.6.2</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.17.2:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.39.0:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

By executing mvn clean install, the classes will be generated on target/classes.
Those classes are more than enough to spin up a server and run a client to communicate to it.

Therefore let’s try to spin up the server.

We shall create a service Implementation

package com.egkatzioura.order.impl;

import com.egkatzioura.order.v1.Order;
import com.egkatzioura.order.v1.OrderServiceGrpc;

import io.grpc.stub.StreamObserver;

public class OrderServiceImpl extends OrderServiceGrpc.OrderServiceImplBase {

    @Override
    public void executeOrder(Order.OrderRequest request, StreamObserver&amp;amp;lt;Order.OrderResponse&amp;amp;gt; responseObserver) {

        Order.OrderResponse response = Order.OrderResponse.newBuilder()
                                                          .setInfo("Hi "+request.getEmail()+", you order has been executed")
                                                          .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}

Then our main class will spin up the server and serve the request.

package com.egkatzioura.order;

import java.io.IOException;

import com.egkatzioura.order.impl.OrderServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;

public class Application {

    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder
                .forPort(8080)
                .addService(new OrderServiceImpl()).build();

        server.start();
        server.awaitTermination();
    }

}

While the server is running we can spin-up another main class which shall communicate to the server and execute an grpc request towards the server

package com.egkatzioura.order;

import com.egkatzioura.order.v1.Order;
import com.egkatzioura.order.v1.OrderServiceGrpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class ApplicationClient {
    public static void main(String[] args) {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8080)
                                                      .usePlaintext()
                                                      .build();

        OrderServiceGrpc.OrderServiceBlockingStub orderServiceBlockingStub
                = OrderServiceGrpc.newBlockingStub(managedChannel);

        Order.OrderRequest orderRequest = Order.OrderRequest.newBuilder()
                                             .setEmail("hello@word.com")
                                             .setProduct("no-name")
                                             .setAmount(3)
                                             .build();

        Order.OrderResponse orderResponse = orderServiceBlockingStub.executeOrder(orderRequest);

        System.out.println("Received response: "+orderResponse.getInfo());

        managedChannel.shutdown();
    }
}

So we just autogenerated grpc code, we backed a grpc service with an implementation, a server spun up and a client got a response from the server.

You can find the source code on github.

Git commit id Plugin with Spring Actuator

The git commit-id plugin is very useful to depict the state of the git repository when a binary has been created. Imagine the case of multiple deployments in a shared staging environment using the same version. You did not cut off your new version yet and multiple deployment are executed, having that information included helps.

We will start by a simple maven Project with a hello world application.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <artifactId>git-commit-id-example</artifactId>
    <groupId>com.gkatzioura</groupId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

</project>

Then a main class

package com.gkatzioura.commitid;

public class Application {
    public static void main(String[] args) {
    }
}

Let’s add the plugin

<build>
        <plugins>
            <plugin>
                <groupId>io.github.git-commit-id</groupId>
                <artifactId>git-commit-id-maven-plugin</artifactId>
                <version>5.0.0</version>
                <executions>
                    <execution>
                        <id>get-the-git-infos</id>
                        <goals>
                            <goal>revision</goal>
                        </goals>
                        <phase>initialize</phase>
                    </execution>
                </executions>
                <configuration>
                    <dotGitDirectory>${project.basedir}/../.git</dotGitDirectory>
                    <generateGitPropertiesFile>true</generateGitPropertiesFile>
                </configuration>
            </plugin>
        </plugins>
    </build>

Obviously this will work, the file will be located at target/classes/git.properties, but we do want to make it easier to retrieve that information.
It’s much easier to have an endpoint that exposes this piece of information, than checking binaries.

This brings us to actuator.
On Spring we have actuator endpoints that show various information like health or in our case info.
Eventually we can inject this information to the info actuator endpoint.

So let’s import our Spring boot dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <artifactId>git-commit-id-example</artifactId>
    <groupId>com.gkatzioura</groupId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>io.github.git-commit-id</groupId>
                <artifactId>git-commit-id-maven-plugin</artifactId>
                <version>5.0.0</version>
                <executions>
                    <execution>
                        <id>get-the-git-infos</id>
                        <goals>
                            <goal>revision</goal>
                        </goals>
                        <phase>initialize</phase>
                    </execution>
                </executions>
                <configuration>
                    <dotGitDirectory>${project.basedir}/../.git</dotGitDirectory>
                    <generateGitPropertiesFile>true</generateGitPropertiesFile>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Also we shall update our main class in order to spin up our Spring Boot Application

package com.gkatzioura.commitid;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
    }

}

Then you need to enable the info endpoint. Can be done by adding the setting on the properties or env variables.

management.endpoints.web.exposure.include=health,info

Once up and running by going to

curl http://localhost:8080/actuator/info

We shall be presented with the git information

{
  "git": {
    "branch": "master",
    "commit": {
      "id": "e77882e",
      "time": "2021-06-20T09:32:36Z"
    }
  }
}

This was pretty seamless so let’s explain what happens behind the scenes.

By doing mvn clean compile the git.properties file get’s generated.
By running the application, the info endpoint will be enabled based on the properties
The Spring environment will pickup the git.properties files.
Actuator will identify that the file exists and will expose it on the properties.

You can find the source code on github.

Keeping track of requests and Responses on Spring WebFlux

In any rest-api based application it’s a matter of time when there is going to be the need to intercept the requests towards the application and execute more than one actions. If those actions, are actions that need to apply towards all requests to the application then the usage of filters makes sense, for example security.

On Servlet based applications we used to have ContentCachingRequestWrapper and ContentCachingResponseWrapper. We look for the same qualities the above give but in a WebFlux environment.

The equivalent solution are the decorator classes provided by the webflux package: ServerHttpRequestDecorator, ServerHttpResponeDecorator, ServerWebExchangeDecorator.

Let’s get started with a simple Flux based api.

First we import the dependencies

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.20</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

The we create a simple model for a post request.

package com.gkatzioura.reactor.fluxfiltercapture;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Info {

    private String description;

}

And the response

package com.gkatzioura.reactor.fluxfiltercapture;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class InfoResponse {

    private boolean success;

    public static InfoResponse successful() {
        return InfoResponse.builder().success(true).build();
    }
}

A controller that uses the models will be implemented. The controller would be a simple echo.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Mono;

@RestController
public class InfoController {


    @PostMapping("/info")
    public Mono<InfoResponse> getInfo(@RequestBody Info info) {
        return Mono.just(InfoResponse.builder().success(true).build());
    }

}

A curl POST can help us debug.

curl --location --request POST 'http://localhost:8080/info' \
--header 'Content-Type: application/json' \
--data-raw '{
"description": "Check"
}'

Your typical filter on Webflux has to implement the WebFilter interface and then if annotated will be picked up by the runtime.

@Component
public class ExampleFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {
        return webFilterChain.filter(serverWebExchange);
    }

}

In our case we want to keep track both of the response and the request body.
Let’s start by creating a ServerHttpRequestDecorator implementation.

package com.gkatzioura.reactor.fluxfiltercapture;

import java.nio.charset.StandardCharsets;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;

public class BodyCaptureRequest extends ServerHttpRequestDecorator {

    private final StringBuilder body = new StringBuilder();

    public BodyCaptureRequest(ServerHttpRequest delegate) {
        super(delegate);
    }

    public Flux<DataBuffer> getBody() {
        return super.getBody().doOnNext(this::capture);
    }

    private void capture(DataBuffer buffer) {
        this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
    }

    public String getFullBody() {
        return this.body.toString();
    }

}

As we can see on the getBody implementation we add a method which will capture the byte chunks that flow while the actual service reads the body.
Once the request is finished the accumulated data will form the actual body.

Same pattern will apply for the ServerHttpResponeDecorator implementation.

package com.gkatzioura.reactor.fluxfiltercapture;

import java.nio.charset.StandardCharsets;

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BodyCaptureResponse extends ServerHttpResponseDecorator {

    private final StringBuilder body = new StringBuilder();

    public BodyCaptureResponse(ServerHttpResponse delegate) {
        super(delegate);
    }

    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
        Flux<DataBuffer> buffer = Flux.from(body);
        return super.writeWith(buffer.doOnNext(this::capture));
    }

    private void capture(DataBuffer buffer) {
        this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
    }

    public String getFullBody() {
        return this.body.toString();
    }

}

Here we override the writeWith function. Those data are are written and pushed down the stream we decorate the argument with a Flux in order to be able to use a method on doOnNext.

In both cases the bytes of the body and the response are accumulated. This might work for specific use cases, for example altering the request/response. If your use case is covered by just streaming the bytes to another system there is no need for accumulation, just an altered function on getBody and writeWith that streams the data will do the work.

Let’s go to our parent decorator that extends ServerWebExchangeDecorator.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;

public class BodyCaptureExchange extends ServerWebExchangeDecorator {

    private BodyCaptureRequest bodyCaptureRequest;
    private BodyCaptureResponse bodyCaptureResponse;

    public BodyCaptureExchange(ServerWebExchange exchange) {
        super(exchange);
        this.bodyCaptureRequest = new BodyCaptureRequest(exchange.getRequest());
        this.bodyCaptureResponse = new BodyCaptureResponse(exchange.getResponse());
    }

    @Override
    public BodyCaptureRequest getRequest() {
        return bodyCaptureRequest;
    }

    @Override
    public BodyCaptureResponse getResponse() {
        return bodyCaptureResponse;
    }

}

Time to focus on our filter. To make the example simple we will print on the console the request and response body.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import reactor.core.publisher.Mono;

@Component
public class CustomWebFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {
        BodyCaptureExchange bodyCaptureExchange = new BodyCaptureExchange(serverWebExchange);
        return webFilterChain.filter(bodyCaptureExchange).doOnSuccess( (se) -> {
            System.out.println("Body request "+bodyCaptureExchange.getRequest().getFullBody());
            System.out.println("Body response "+bodyCaptureExchange.getResponse().getFullBody());
        });
    }

}

If we run the Curl above eventually we shall have the body of the request and response printed.
You can find the source code on github.