Executing Blocking calls on a Reactor based Application

Project Reactor is a fully non-blocking foundation with back-pressure support included. Although most libraries out there support asynchronous methods thus assist on its usage, there are some cases where a library contains complex blocking methods without an asynchronous implementation. Calling this methods inside a reactor stream would have bad results. Instead we need to make those method to async ones or find if there is a workaround.

Provided you might be short on time and is not possible to contribute a patch to the tool used, or you cannot identify how to reverse engineer the blocking call and implement a non blocking version, then it makes sense to utilise some threads.

First let’s import the dependencies for our project

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.11</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

Let’s start with out blocking service

    public String get(String url) throws IOException {
        HttpURLConnection connection = (HttpsURLConnection) new URL(url).openConnection();
        connection.setRequestMethod("GET");
        connection.setDoOutput(true);
        try(InputStream inputStream = connection.getInputStream()) {
            return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
        }
    }

We used HttpsURLConnection since we know for sure that it is a blocking call. To do so we need a Scheduler. For the blocking calls we shall use the boundedElastic scheduler. A scheduler can also be created by an existing executor service.

So let’s transform this method to a non-blocking one.

package com.gkatzioura.blocking;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class BlockingAsyncService {

    private final BlockingService blockingService;

    public BlockingAsyncService(BlockingService blockingService) {
        this.blockingService = blockingService;
    }

    private Mono<String> get(String url) {
        return Mono.fromCallable(() -> blockingService.get(url))
                .subscribeOn(Schedulers.boundedElastic());
    }

}

What we can see is a Mono created from the callable method. A scheduler subscribes to this mono and thus will receive the event emitted, which shall be scheduled for execution.

Let’s have a test

package com.gkatzioura.blocking;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class BlockingAsyncServiceTest {

    private BlockingAsyncService blockingAsyncService;

    @BeforeEach
    void setUp() {
        blockingAsyncService = new BlockingAsyncService(new BlockingService());
    }

    @Test
    void name() {
        StepVerifier.create(
                            Mono.just("https://www.google.com/")
                                .map(s -> blockingAsyncService.get(s))
                                .flatMap(s -> s)
                    )
                .consumeNextWith(s -> s.startsWith("<!doctype"))
                .verifyComplete();
    }
}

That’s it! Obviously the best thing to do is to find a way to make this blocking call into an async call and try to find a workaround using the async libraries out there. When it’s not feasible we can fallback on using Threads.

Receive Pub/Sub messages to your Spring Application

Pub/Sub is a messaging solution provided by GCP

Before we dive into the actual configuration we need to be aware that Spring Cloud for GCP is now managed by the Google Cloud Team. Therefore the latest code can be found here.

Our application will receive messages from Pub/Sub and expose them using an endpoint.
Let’s go for the imports first

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gkatzioura</groupId>
    <artifactId>spring-cloud-pubsub-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>spring-cloud-gcp-dependencies</artifactId>
                <version>2.0.4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-pubsub</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-autoconfigure</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>
    </dependencies>

</project>

Quick note: with a few tweaks you can use the PubSub emulator available from the Google Cloud Team.

The first class will contain the Pub/Sub messages received. It will be a queue containing a limited number of messages.

package com.gkatzioura.pubsub.example;

import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

@Component
public class LatestUpdates {

    LinkedBlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);

    public void addUpdate(String update) {
        boundedQueue.add(update);
    }

    public String fetch() {
        return boundedQueue.poll();
    }

}

The Pub/Sub configuration will initiate the listener, plus shall use spring integration.

We define a message channel.

    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

Then add the inbound channel adapter The ack mode will be set to manual.

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
                new PubSubInboundChannelAdapter(pubSubTemplate, "your-subscription");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(String.class);
        return adapter;
    }

Then we add a listener method. The way acknowledgements are handled is up to the developer. If a exception occurs on that block it will be caught and send on an error stream. Therefore messages will continue to get pulled.

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload,
                                @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
        latestUpdates.addUpdate(message.getPubsubMessage().getData().toStringUtf8());
        message.ack();
    }

The entire Pub/Sub configuration

package com.gkatzioura.pubsub.example;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;

import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.AckMode;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;

@Configuration
public class PubSubConfiguration {

    private final LatestUpdates latestUpdates;

    public PubSubConfiguration(LatestUpdates latestUpdates) {
        this.latestUpdates = latestUpdates;
    }

    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
                new PubSubInboundChannelAdapter(pubSubTemplate, "your-subscription");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(String.class);
        return adapter;
    }

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload,
                                @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
        latestUpdates.addUpdate(message.getPubsubMessage().getData().toStringUtf8());
        message.ack();
    }

}

The controller will just pull from the internal Queue.

package com.gkatzioura.pubsub.example;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UpdatesController {

    private LatestUpdates latestUpdates;

    public UpdatesController(LatestUpdates latestUpdates) {
        this.latestUpdates = latestUpdates;
    }

    @GetMapping("/update")
    public String getLatestUpdate() {
        return latestUpdates.fetch();
    }

}

Next step is to define an application for Spring

package com.gkatzioura.pubsub.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ExampleApplication {


    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }

}

By running the application be aware that you need to have at least one env variable set

spring.cloud.gcp.pubsub.enabled=true

This will fallback to your Local GCP configuration and will identify your credentials as well as the project pointing at.

That’s it! To summarise, we achieved to pull messages from Pub/Sub and expose them on an endpoint.

Using R2DBC with a Reactor Application

Since Reactor has taken over the Java world it was inevitable the a reactive sql library would be there.
In this blog we shall use r2dbc with h2 and reactor.

We shall start with the dependencies needed.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
    </parent>

    <groupId>com.gkatzioura</groupId>
    <artifactId>r2dbc-reactor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

We imported spring data from r2dbc, the h2 r2dbc driver, the h2 binary as well as the test utils.

Supposing that this is our schema.
This schema is a postgresql schema.

create table order_request (
	id uuid NOT NULL constraint or_id_pk primary key,
	created_by varchar,
	created timestamp default now()              not null,
	updated timestamp default now()              not null
);

We shall add it later to test/resources/schema.sql for testing purposes.

Also let’s add a new model

package com.gkatzioura.r2dbc.model;

import java.time.LocalDateTime;
import java.util.UUID;

import org.springframework.data.annotation.Id;
import org.springframework.data.domain.Persistable;
import org.springframework.data.relational.core.mapping.Table;

@Table("order_request")
public class OrderRequest implements Persistable<UUID> {

    @Id
    private UUID id;
    private String createdBy;
    private LocalDateTime created;
    private LocalDateTime updated;

    public void setId(UUID id) {
        this.id = id;
    }

    public String getCreatedBy() {
        return createdBy;
    }

    public void setCreatedBy(String createdBy) {
        this.createdBy = createdBy;
    }

    public LocalDateTime getCreated() {
        return created;
    }

    public void setCreated(LocalDateTime created) {
        this.created = created;
    }

    public LocalDateTime getUpdated() {
        return updated;
    }

    public void setUpdated(LocalDateTime updated) {
        this.updated = updated;
    }

    @Override
    public UUID getId() {
        return id;
    }

    @Override
    public boolean isNew() {
        return created == null;
    }

}

Pay attention to isNew method. This way the repository can identify if the object should be persisted or updated.

Now onwards to our Repository

package com.gkatzioura.r2dbc.repository;

import java.util.UUID;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.gkatzioura.r2dbc.model.OrderRequest;

public interface OrderRepository extends ReactiveCrudRepository<OrderRequest, UUID> {
}

Let’s put some tests.

As mentioned the schema above will reside in test/resources/schema.sql

We shall add some configuration for the test h2 db. We need to make sure that h2 will pickup the postgresql interface.

package com.gkatzioura.r2dbc;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.r2dbc.connection.init.CompositeDatabasePopulator;
import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer;
import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator;

import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;

@Configuration
@EnableR2dbcRepositories
public class H2ConnectionConfiguration extends AbstractR2dbcConfiguration  {

    @Override
    public ConnectionFactory connectionFactory() {
        return new H2ConnectionFactory(
                io.r2dbc.h2.H2ConnectionConfiguration.builder()
                                                     .url("mem:testdb;MODE=PostgreSQL;DB_CLOSE_DELAY=-1;")
                                                     .build()
        );
    }

    @Bean
    public ConnectionFactoryInitializer initializer() {
        var initializer = new ConnectionFactoryInitializer();
        initializer.setConnectionFactory(connectionFactory());

        var databasePopulator = new CompositeDatabasePopulator();
        databasePopulator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("schema.sql")));
        initializer.setDatabasePopulator(databasePopulator);
        return initializer;
    }

}

With this configuration we create a H2 database simulating a Postgresql DB, we create the schemas as well as enable the creation of the R2DBC repositories.

Also let’s add a test.

package com.gkatzioura.r2dbc.repository;

import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import com.gkatzioura.r2dbc.H2ConnectionConfiguration;
import com.gkatzioura.r2dbc.model.OrderRequest;
import reactor.test.StepVerifier;

@ExtendWith({SpringExtension.class})
@Import({H2ConnectionConfiguration.class})
class OrderRepositoryTest {

    @Autowired
    private OrderRepository orderRepository;

    @Test
    void testSave() {
        UUID id = UUID.randomUUID();
        OrderRequest orderRequest = new OrderRequest();
        orderRequest.setId(id);
        orderRequest.setCreatedBy("test-user");

        var persisted = orderRepository.save(orderRequest)
                                       .map(a -> orderRepository.findById(a.getId()))
                                       .flatMap(a -> a.map(b -> b.getId()));

        StepVerifier.create(persisted).expectNext(id).verifyComplete();
    }
}

That’s it, you can find the code on github.

Git commit id Plugin with Spring Actuator

The git commit-id plugin is very useful to depict the state of the git repository when a binary has been created. Imagine the case of multiple deployments in a shared staging environment using the same version. You did not cut off your new version yet and multiple deployment are executed, having that information included helps.

We will start by a simple maven Project with a hello world application.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <artifactId>git-commit-id-example</artifactId>
    <groupId>com.gkatzioura</groupId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

</project>

Then a main class

package com.gkatzioura.commitid;

public class Application {
    public static void main(String[] args) {
    }
}

Let’s add the plugin

<build>
        <plugins>
            <plugin>
                <groupId>io.github.git-commit-id</groupId>
                <artifactId>git-commit-id-maven-plugin</artifactId>
                <version>5.0.0</version>
                <executions>
                    <execution>
                        <id>get-the-git-infos</id>
                        <goals>
                            <goal>revision</goal>
                        </goals>
                        <phase>initialize</phase>
                    </execution>
                </executions>
                <configuration>
                    <dotGitDirectory>${project.basedir}/../.git</dotGitDirectory>
                    <generateGitPropertiesFile>true</generateGitPropertiesFile>
                </configuration>
            </plugin>
        </plugins>
    </build>

Obviously this will work, the file will be located at target/classes/git.properties, but we do want to make it easier to retrieve that information.
It’s much easier to have an endpoint that exposes this piece of information, than checking binaries.

This brings us to actuator.
On Spring we have actuator endpoints that show various information like health or in our case info.
Eventually we can inject this information to the info actuator endpoint.

So let’s import our Spring boot dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <artifactId>git-commit-id-example</artifactId>
    <groupId>com.gkatzioura</groupId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>io.github.git-commit-id</groupId>
                <artifactId>git-commit-id-maven-plugin</artifactId>
                <version>5.0.0</version>
                <executions>
                    <execution>
                        <id>get-the-git-infos</id>
                        <goals>
                            <goal>revision</goal>
                        </goals>
                        <phase>initialize</phase>
                    </execution>
                </executions>
                <configuration>
                    <dotGitDirectory>${project.basedir}/../.git</dotGitDirectory>
                    <generateGitPropertiesFile>true</generateGitPropertiesFile>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Also we shall update our main class in order to spin up our Spring Boot Application

package com.gkatzioura.commitid;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
    }

}

Then you need to enable the info endpoint. Can be done by adding the setting on the properties or env variables.

management.endpoints.web.exposure.include=health,info

Once up and running by going to

curl http://localhost:8080/actuator/info

We shall be presented with the git information

{
  "git": {
    "branch": "master",
    "commit": {
      "id": "e77882e",
      "time": "2021-06-20T09:32:36Z"
    }
  }
}

This was pretty seamless so let’s explain what happens behind the scenes.

By doing mvn clean compile the git.properties file get’s generated.
By running the application, the info endpoint will be enabled based on the properties
The Spring environment will pickup the git.properties files.
Actuator will identify that the file exists and will expose it on the properties.

You can find the source code on github.

Keeping track of requests and Responses on Spring WebFlux

In any rest-api based application it’s a matter of time when there is going to be the need to intercept the requests towards the application and execute more than one actions. If those actions, are actions that need to apply towards all requests to the application then the usage of filters makes sense, for example security.

On Servlet based applications we used to have ContentCachingRequestWrapper and ContentCachingResponseWrapper. We look for the same qualities the above give but in a WebFlux environment.

The equivalent solution are the decorator classes provided by the webflux package: ServerHttpRequestDecorator, ServerHttpResponeDecorator, ServerWebExchangeDecorator.

Let’s get started with a simple Flux based api.

First we import the dependencies

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.20</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

The we create a simple model for a post request.

package com.gkatzioura.reactor.fluxfiltercapture;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Info {

    private String description;

}

And the response

package com.gkatzioura.reactor.fluxfiltercapture;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class InfoResponse {

    private boolean success;

    public static InfoResponse successful() {
        return InfoResponse.builder().success(true).build();
    }
}

A controller that uses the models will be implemented. The controller would be a simple echo.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Mono;

@RestController
public class InfoController {


    @PostMapping("/info")
    public Mono<InfoResponse> getInfo(@RequestBody Info info) {
        return Mono.just(InfoResponse.builder().success(true).build());
    }

}

A curl POST can help us debug.

curl --location --request POST 'http://localhost:8080/info' \
--header 'Content-Type: application/json' \
--data-raw '{
"description": "Check"
}'

Your typical filter on Webflux has to implement the WebFilter interface and then if annotated will be picked up by the runtime.

@Component
public class ExampleFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {
        return webFilterChain.filter(serverWebExchange);
    }

}

In our case we want to keep track both of the response and the request body.
Let’s start by creating a ServerHttpRequestDecorator implementation.

package com.gkatzioura.reactor.fluxfiltercapture;

import java.nio.charset.StandardCharsets;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;

public class BodyCaptureRequest extends ServerHttpRequestDecorator {

    private final StringBuilder body = new StringBuilder();

    public BodyCaptureRequest(ServerHttpRequest delegate) {
        super(delegate);
    }

    public Flux<DataBuffer> getBody() {
        return super.getBody().doOnNext(this::capture);
    }

    private void capture(DataBuffer buffer) {
        this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
    }

    public String getFullBody() {
        return this.body.toString();
    }

}

As we can see on the getBody implementation we add a method which will capture the byte chunks that flow while the actual service reads the body.
Once the request is finished the accumulated data will form the actual body.

Same pattern will apply for the ServerHttpResponeDecorator implementation.

package com.gkatzioura.reactor.fluxfiltercapture;

import java.nio.charset.StandardCharsets;

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BodyCaptureResponse extends ServerHttpResponseDecorator {

    private final StringBuilder body = new StringBuilder();

    public BodyCaptureResponse(ServerHttpResponse delegate) {
        super(delegate);
    }

    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
        Flux<DataBuffer> buffer = Flux.from(body);
        return super.writeWith(buffer.doOnNext(this::capture));
    }

    private void capture(DataBuffer buffer) {
        this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
    }

    public String getFullBody() {
        return this.body.toString();
    }

}

Here we override the writeWith function. Those data are are written and pushed down the stream we decorate the argument with a Flux in order to be able to use a method on doOnNext.

In both cases the bytes of the body and the response are accumulated. This might work for specific use cases, for example altering the request/response. If your use case is covered by just streaming the bytes to another system there is no need for accumulation, just an altered function on getBody and writeWith that streams the data will do the work.

Let’s go to our parent decorator that extends ServerWebExchangeDecorator.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;

public class BodyCaptureExchange extends ServerWebExchangeDecorator {

    private BodyCaptureRequest bodyCaptureRequest;
    private BodyCaptureResponse bodyCaptureResponse;

    public BodyCaptureExchange(ServerWebExchange exchange) {
        super(exchange);
        this.bodyCaptureRequest = new BodyCaptureRequest(exchange.getRequest());
        this.bodyCaptureResponse = new BodyCaptureResponse(exchange.getResponse());
    }

    @Override
    public BodyCaptureRequest getRequest() {
        return bodyCaptureRequest;
    }

    @Override
    public BodyCaptureResponse getResponse() {
        return bodyCaptureResponse;
    }

}

Time to focus on our filter. To make the example simple we will print on the console the request and response body.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import reactor.core.publisher.Mono;

@Component
public class CustomWebFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {
        BodyCaptureExchange bodyCaptureExchange = new BodyCaptureExchange(serverWebExchange);
        return webFilterChain.filter(bodyCaptureExchange).doOnSuccess( (se) -> {
            System.out.println("Body request "+bodyCaptureExchange.getRequest().getFullBody());
            System.out.println("Body response "+bodyCaptureExchange.getResponse().getFullBody());
        });
    }

}

If we run the Curl above eventually we shall have the body of the request and response printed.
You can find the source code on github.

Spring Boot and Micrometer with Prometheus Part 6: Securing metrics

Previously we successfully spun up our Spring Boot application With Prometheus. An endpoint in our Spring application is exposing our metric data so that prometheus is able to retrieve them.
The main question that comes to mind is how to secure this information.

Spring already provides us with its great security framework, so it will be fairly easy to use it for our application. The goal would be to use basic authentication for the actuator/prometheus endpoints and also configure prometheus in order to access that information using basic authentication.

So the first step is to enable the security on our app. The first step is to add the security jar.

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-security</artifactId>
    </dependency>

The Spring boot application will get secured on its own by generating a password for the default user.
However we do want to have control over the username and password so we are going to use some environment variables.

By running the application with the credentials for the default user we have the prometheus endpoints secured with a minimal configuration.

SPRING_SECURITY_USER_NAME=test-user SPRING_SECURITY_USER_PASSWORD=test-password mvn spring-boot:run

So now that we have the security setup on our app, it’s time to update our prometheus config.

scrape_configs:
  - job_name: 'prometheus-spring'
    scrape_interval: 1m
    metrics_path: '/actuator/prometheus'
    static_configs:
      - targets: ['my.local.machine:8080']
    basic_auth:
      username: "test-user"
      password: "test-password"

So let’s run again prometheus as described previously.

To sum app after this change prometheus will gather metrics data for our application in a secure way.

Spring Boot and Micrometer with Prometheus Part 5: Spinning up prometheus

Previously we got our Spring Boot Application adapter in order to expose the endpoints for prometheus.
This blog will focus on setting up prometheus and configure it in order to server the Spring Boot Endpoints.
So let’s get started by spinning up the prometheus server using docker.

Before proceeding on spinning up prometheus we need to supply a configuration file to pull data from our application.
Thus we should supply a prometheus.yaml file with the following contents.

scrape_configs:
  - job_name: 'prometheus-spring'
    scrape_interval: 1m
    metrics_path: '/actuator/prometheus'
    static_configs:
      - targets: ['my.local.machine:8080']

Let’s use the command taken from here.

Due to using prometheus on osx through docker, we need some workarounds to connect through the app

sudo ifconfig lo0 alias 172.16.222.111

We can use directly docker

docker run -v /path/to/prometheus.yaml:/etc/prometheus/prometheus.yml -p 9090:9090 --add-host="my.local.machine:172.16.222.111" prom/prometheus

By doing the above we shall be able to interact with our local application from inside the docker image.

So if we navigate to http://localhost:9090/graph we shall be greeted with our prometheus screen.
Also inside our prometheus container we are also able to communicate to our application which shall run locally.

So let’s give some time and see if the data has been collected. Then let’s go to prometheus status page http://localhost:9090/status.

We shall be greeted by the JVM information of our application.

On the next blog we shall focus on securing our prometheus endpoints.

Spring Boot and Micrometer with Prometheus Part 4: The base project

In previous posts we had a look on Spring Micrometer and InfluxDB. So you are gonna ask me why prometheus.
The reason is that prometheus is operating on a pull model vs the push model of InfluxDB.

This means that if you use micrometer with InfluxDB you are definitely going to have some overhead on pushing the results to the database as well as it is one extra pain point to make the InfluxDB database always there available to handle all the requests.

So what if instead of pushing the data, use another tool in order to pull data from the applications?
This is one of the things you can get by using Prometheus. By using prometheus you ask for the data from the application, you don’t have to receive the data.

So what we are going to do is to use exactly the same project we used on the first tutorial.

The only changes needed shall be on the applicaiton.yaml as well as the pom.xml

We shall start from pom.xml and add the micrometer binary for prometheus.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.4.RELEASE</version>
	</parent>

	<groupId>com.gkatzioura</groupId>
	<artifactId>spring-prometheus-micrometer</artifactId>
	<version>1.0-SNAPSHOT</version>

	<properties>
		<micrometer.version>1.3.2</micrometer.version>
	</properties>

	<build>
		<defaultGoal>spring-boot:run</defaultGoal>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>8</source>
					<target>8</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
		<dependency>
			<groupId>io.micrometer</groupId>
			<artifactId>micrometer-core</artifactId>
			<version>${micrometer.version}</version>
		</dependency>
		<dependency>
			<groupId>io.micrometer</groupId>
			<artifactId>micrometer-registry-prometheus</artifactId>
			<version>${micrometer.version}</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.12</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>
</project>

Then we shall add application.yaml which enables prometheus.

management:
  endpoints:
    web:
      exposure:
        include: prometheus

So now we are ready to run the application.

> mvn spring-boot:run

If we try to access actuator we are gonna be presented with the prometheus endpoint.

> curl http://localhost:8080/actuator
{
  "_links": {
    "self": {
      "href": "http://localhost:8080/actuator",
      "templated": false
    },
    "prometheus": {
      "href": "http://localhost:8080/actuator/prometheus",
      "templated": false
    }
  }
}

This “http://localhost:8080/actuator/prometheus&#8221; is the endpoint that our prometheus server would use to pull data.
So our prometheus server needs to be configured to access these data exposed by that endpoint.

On the next blog we shall deploy prometheus and view some metrics.

Apache Ignite and Spring on your Kubernetes Cluster Part 1: Spring Boot application

On a previous series of blogs we spun up an Ignite cluster on a Kubernetes cluster.
In this tutorial we shall use the Ignite cluster created previously on with a Spring Boot Application.


Let’s create our project using Spring Boot. The Spring Boot application will connect to the Ignite cluster.

Let’s add our dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.5.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.gkatzioura</groupId>
	<artifactId>job-api-ignite</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>job-api-ignite</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-cache</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-kubernetes</artifactId>
			<version>2.7.6</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-spring</artifactId>
			<version>2.7.6</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.ignite</groupId>
					<artifactId>ignite-indexing</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.12</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

As in previous tutorials we shall use GitHub’s Job api.

The first step would be to add the Job Model that deserializes.

package com.gkatzioura.jobapi.model;

import java.io.Serializable;

import lombok.Data;

@Data
public class Job implements Serializable {

	private String id;
	private String type;
	private String url;
	private String createdAt;
	private String company;
	private String companyUrl;
	private String location;
	private String title;
	private String description;

}

The we need a repository for the Jobs. Beware the class needs to be serializable. Ignite caches data off-heap.

package com.gkatzioura.jobapi.repository;

import java.util.ArrayList;
import java.util.List;

import com.gkatzioura.jobapi.model.Job;
import lombok.Data;
import org.apache.ignite.Ignite;

import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Repository;
import org.springframework.web.client.RestTemplate;

@Repository
public class GitHubJobRepository {

	private static final String JOB_API_CONSTANST = "https://jobs.github.com/positions.json?page={page}";
	public static final String GITHUBJOB_CACHE = "githubjob";

	private final RestTemplate restTemplate;
	private final Ignite ignite;

	GitHubJobRepository(Ignite ignite) {
		this.restTemplate = new RestTemplate();
		this.ignite = ignite;
	}

	@Cacheable(value = GITHUBJOB_CACHE)
	public List<Job> getJob(int page) {
		return restTemplate.getForObject(JOB_API_CONSTANST,JobList.class,page);
	}

	public List<Job> fetchFromIgnite(int page) {
		for(String cache: ignite.cacheNames()) {
			if(cache.equals(GITHUBJOB_CACHE)) {
				return (List<Job>) ignite.getOrCreateCache(cache).get(1);
			}
		}

		return new ArrayList<>();
	}

	@Data
	private static class JobList  extends ArrayList<Job> {
	}
}

The main reason the JobList class exists is for convenience for unmarshalling.
As you can see the repository has the annotation @Cacheable. This mean that our requests will be cached. The fetchFromIgnite method is a test method for the sake of this example. We shall use it to access the data cached by ignite directly.

We shall also add the controller.

package com.gkatzioura.jobapi.controller;

import java.util.List;

import com.gkatzioura.jobapi.model.Job;
import com.gkatzioura.jobapi.repository.GitHubJobRepository;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/jobs")
public class JobsController {

	private final GitHubJobRepository gitHubJobRepository;

	JobsController(GitHubJobRepository gitHubJobRepository) {
		this.gitHubJobRepository = gitHubJobRepository;
	}

	@GetMapping("/github/{page}")
	public List<Job> gitHub(@PathVariable("page") int page) {
		return this.gitHubJobRepository.getJob(page);
	}

	@GetMapping("/github/ignite/{page}")
	public List<Job> gitHubIgnite(@PathVariable("page") int page) {
		return this.gitHubJobRepository.fetchFromIgnite(page);
	}

}

Two methods on the controller, the one to fetch the data as usual and caches them behind the scenes and the other on that we shall use for testing.

It’s time for us to configure the Ignite client that uses the nodes on our Kubernetes cluster.

package com.gkatzioura.jobapi.config;


import lombok.extern.slf4j.Slf4j;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.spring.SpringCacheManager;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder;

import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableCaching
@Slf4j
public class SpringCacheConfiguration {

	@Bean
	public Ignite igniteInstance() {
		log.info("Creating ignite instance");
		TcpDiscoveryKubernetesIpFinder tcpDiscoveryKubernetesIpFinder = new TcpDiscoveryKubernetesIpFinder();
		tcpDiscoveryKubernetesIpFinder.setNamespace("default");
		tcpDiscoveryKubernetesIpFinder.setServiceName("job-cache");

		TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
		tcpDiscoverySpi.setIpFinder(tcpDiscoveryKubernetesIpFinder);

		IgniteConfiguration igniteConfiguration = new IgniteConfiguration();

		igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);
		igniteConfiguration.setClientMode(false);

		return Ignition.start(igniteConfiguration);
	}

	@Bean
	public SpringCacheManager cacheManager(Ignite ignite) {
		SpringCacheManager springCacheManager =new SpringCacheManager();
		springCacheManager.setIgniteInstanceName(ignite.name());
		return springCacheManager;
	}

}

We created our cache. It shall use the Kubernetes TCP discovery mode.

The next step is to add our Main class.

package com.gkatzioura.jobapi;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;

@SpringBootApplication
@EnableCaching
public class IgniteKubeClusterApplication {

	public static void main(String[] args) {
		SpringApplication.run(IgniteKubeClusterApplication.class, args);
	}

}

The next blog will be focused on shipping the solution to kubernetes.

Spring Boot and Micrometer with InlfuxDB Part 2: Adding InfluxDB

Since we added our base application it is time for us to spin up an InfluxDB instance.

We shall follow a previous tutorial and add a docker instance.

docker run –rm -p 8086:8086 –name influxdb-local influxdb

Time to add the micrometer InfluxDB dependency on our pom

<dependencies>
...
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-core</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-influx</artifactId>
            <version>1.3.2</version>
        </dependency>
...
</dependencies>

Time to add the configuration through the application.yaml

management:
  metrics:
    export:
      influx:
        enabled: true
        db: devjobsapi
        uri: http://127.0.0.1:8086
  endpoints:
    web:
      expose: "*"

Let’s spin up our application and do some requests.
After some time we can check the database and the data contained.

docker exec -it influxdb-local influx
> SHOW DATABASES;
name: databases
name
----
_internal
devjobsapi
> use devjobsapi
Using database devjobsapi
> SHOW MEASUREMENTS
name: measurements
name
----
http_server_requests
jvm_buffer_count
jvm_buffer_memory_used
jvm_buffer_total_capacity
jvm_classes_loaded
jvm_classes_unloaded
jvm_gc_live_data_size
jvm_gc_max_data_size
jvm_gc_memory_allocated
jvm_gc_memory_promoted
jvm_gc_pause
jvm_memory_committed
jvm_memory_max
jvm_memory_used
jvm_threads_daemon
jvm_threads_live
jvm_threads_peak
jvm_threads_states
logback_events
process_cpu_usage
process_files_max
process_files_open
process_start_time
process_uptime
system_cpu_count
system_cpu_usage
system_load_average_1m

That’s pretty awesome. Let’s check the endpoints accessed.

> SELECT*FROM http_server_requests;
name: http_server_requests
time                count exception mean        method metric_type outcome status sum         upper       uri
----                ----- --------- ----        ------ ----------- ------- ------ ---         -----       ---
1582586157093000000 1     None      252.309331  GET    histogram   SUCCESS 200    252.309331  252.309331  /actuator
1582586157096000000 0     None      0           GET    histogram   SUCCESS 200    0           2866.531375 /jobs/github/{page}

Pretty great! The next step would be to visualise those metrics.