Keeping track of requests and Responses on Spring WebFlux

In any rest-api based application it’s a matter of time when there is going to be the need to intercept the requests towards the application and execute more than one actions. If those actions, are actions that need to apply towards all requests to the application then the usage of filters makes sense, for example security.

On Servlet based applications we used to have ContentCachingRequestWrapper and ContentCachingResponseWrapper. We look for the same qualities the above give but in a WebFlux environment.

The equivalent solution are the decorator classes provided by the webflux package: ServerHttpRequestDecorator, ServerHttpResponeDecorator, ServerWebExchangeDecorator.

Let’s get started with a simple Flux based api.

First we import the dependencies

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.20</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

The we create a simple model for a post request.

package com.gkatzioura.reactor.fluxfiltercapture;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Info {

    private String description;

}

And the response

package com.gkatzioura.reactor.fluxfiltercapture;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class InfoResponse {

    private boolean success;

    public static InfoResponse successful() {
        return InfoResponse.builder().success(true).build();
    }
}

A controller that uses the models will be implemented. The controller would be a simple echo.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Mono;

@RestController
public class InfoController {


    @PostMapping("/info")
    public Mono<InfoResponse> getInfo(@RequestBody Info info) {
        return Mono.just(InfoResponse.builder().success(true).build());
    }

}

A curl POST can help us debug.

curl --location --request POST 'http://localhost:8080/info' \
--header 'Content-Type: application/json' \
--data-raw '{
"description": "Check"
}'

Your typical filter on Webflux has to implement the WebFilter interface and then if annotated will be picked up by the runtime.

@Component
public class ExampleFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {
        return webFilterChain.filter(serverWebExchange);
    }

}

In our case we want to keep track both of the response and the request body.
Let’s start by creating a ServerHttpRequestDecorator implementation.

package com.gkatzioura.reactor.fluxfiltercapture;

import java.nio.charset.StandardCharsets;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;

public class BodyCaptureRequest extends ServerHttpRequestDecorator {

    private final StringBuilder body = new StringBuilder();

    public BodyCaptureRequest(ServerHttpRequest delegate) {
        super(delegate);
    }

    public Flux<DataBuffer> getBody() {
        return super.getBody().doOnNext(this::capture);
    }

    private void capture(DataBuffer buffer) {
        this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
    }

    public String getFullBody() {
        return this.body.toString();
    }

}

As we can see on the getBody implementation we add a method which will capture the byte chunks that flow while the actual service reads the body.
Once the request is finished the accumulated data will form the actual body.

Same pattern will apply for the ServerHttpResponeDecorator implementation.

package com.gkatzioura.reactor.fluxfiltercapture;

import java.nio.charset.StandardCharsets;

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BodyCaptureResponse extends ServerHttpResponseDecorator {

    private final StringBuilder body = new StringBuilder();

    public BodyCaptureResponse(ServerHttpResponse delegate) {
        super(delegate);
    }

    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
        Flux<DataBuffer> buffer = Flux.from(body);
        return super.writeWith(buffer.doOnNext(this::capture));
    }

    private void capture(DataBuffer buffer) {
        this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
    }

    public String getFullBody() {
        return this.body.toString();
    }

}

Here we override the writeWith function. Those data are are written and pushed down the stream we decorate the argument with a Flux in order to be able to use a method on doOnNext.

In both cases the bytes of the body and the response are accumulated. This might work for specific use cases, for example altering the request/response. If your use case is covered by just streaming the bytes to another system there is no need for accumulation, just an altered function on getBody and writeWith that streams the data will do the work.

Let’s go to our parent decorator that extends ServerWebExchangeDecorator.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;

public class BodyCaptureExchange extends ServerWebExchangeDecorator {

    private BodyCaptureRequest bodyCaptureRequest;
    private BodyCaptureResponse bodyCaptureResponse;

    public BodyCaptureExchange(ServerWebExchange exchange) {
        super(exchange);
        this.bodyCaptureRequest = new BodyCaptureRequest(exchange.getRequest());
        this.bodyCaptureResponse = new BodyCaptureResponse(exchange.getResponse());
    }

    @Override
    public BodyCaptureRequest getRequest() {
        return bodyCaptureRequest;
    }

    @Override
    public BodyCaptureResponse getResponse() {
        return bodyCaptureResponse;
    }

}

Time to focus on our filter. To make the example simple we will print on the console the request and response body.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import reactor.core.publisher.Mono;

@Component
public class CustomWebFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {
        BodyCaptureExchange bodyCaptureExchange = new BodyCaptureExchange(serverWebExchange);
        return webFilterChain.filter(bodyCaptureExchange).doOnSuccess( (se) -> {
            System.out.println("Body request "+bodyCaptureExchange.getRequest().getFullBody());
            System.out.println("Body response "+bodyCaptureExchange.getResponse().getFullBody());
        });
    }

}

If we run the Curl above eventually we shall have the body of the request and response printed.
You can find the source code on github.

BigQuery Storage API: Avro

Previously we had an introduction on the BigQuery Storage API and we proceeded reading data using the Arrow format.
In this tutorial we shall read Data using the Avro format.

What applied on the previous tutorial applies here too.

We shall create a BigQuery Storage Client, create a ReadSession using the Avro format and iterate the data on each stream.

Let’s get started by importing the dependencies, we do import the Avro library needed.

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>libraries-bom</artifactId>
                <version>20.5.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquerystorage</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>
    </dependencies>

Our next step would be to create an Avro Data Reader for our rows that have the schema of col1:string, col2:int. In our case we shall just print the data through sys.out

package com.gkatzioura.bigquery.storage.api.avro;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;

import com.google.cloud.bigquery.storage.v1.AvroSchema;
import com.google.protobuf.ByteString;

public class AvroReader {

    private final GenericDatumReader<GenericRecord> datumReader;

    public AvroReader(AvroSchema arrowSchema) {
        Schema schema = new Schema.Parser().parse(arrowSchema.getSchema());
        this.datumReader = new GenericDatumReader<>(schema);
    }

    public void processRows(ByteString avroRows) throws IOException {
        try(InputStream inputStream = new ByteArrayInputStream(avroRows.toByteArray())) {
            BinaryDecoder decoder =DecoderFactory.get().binaryDecoder(inputStream, null);

            while (!decoder.isEnd()) {
                GenericRecord item = datumReader.read(null, decoder);

                System.out.println(item.get("col1")+","+item.get("col2"));
            }
        }
    }

}

Then on to our main class which is the one with any BigQuery logic needed.

package com.gkatzioura.bigquery.storage.api.avro;

import org.apache.arrow.util.Preconditions;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;

public class AvroMain {

    public static void main(String[] args) throws Exception {

        String projectId = System.getenv("PROJECT_ID");

        try (BigQueryReadClient client = BigQueryReadClient.create()) {
            String parent = String.format("projects/%s", projectId);

            String srcTable =
                    String.format(
                            "projects/%s/datasets/%s/tables/%s",
                            projectId, System.getenv("DATASET"), System.getenv("TABLE"));

            ReadSession.Builder sessionBuilder =
                    ReadSession.newBuilder()
                               .setTable(srcTable)
                               .setDataFormat(DataFormat.AVRO);


            CreateReadSessionRequest.Builder builder =
                    CreateReadSessionRequest.newBuilder()
                                            .setParent(parent)
                                            .setReadSession(sessionBuilder)
                                            .setMaxStreamCount(1);
            ReadSession session = client.createReadSession(builder.build());

            Preconditions.checkState(session.getStreamsCount() > 0);

            String streamName = session.getStreams(0).getName();

            ReadRowsRequest readRowsRequest =
                    ReadRowsRequest.newBuilder().setReadStream(streamName).build();

            ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);

            for (ReadRowsResponse response : stream) {
                new AvroReader(session.getAvroSchema()).processRows(response.getAvroRows().getSerializedBinaryRows());
            }
        }
    }

}

A BigQuery client is created. Then we create a session request with a max number of streams. We did specify that the format to be used will be Avro.
Once we get a Response, the response will contain the initiated the Session, the Avro schema and the streams that we shall use to retrieve the Data.
For each stream there has to be a ReadRowsRequest in order to fetch the data.
Then we pass data to our Avro decoder.

That’s it we just read data from the BigQuery Storage API using Avro and Arrow!

BigQuery Storage API: Arrow

Previously we had an introduction on the BigQuery Storage API. As explained the storage API of BigQuery supports two formats. For this tutorial we will choose the Arrow Format.

First let’s import the dependencies. The BigQuery storage API binary does not come with a library to parse Arrow. This way the consumer receives the binaries in an Arrow format, and it’s up to the consumer on how to consume the binaries and what libraries to use.


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>libraries-bom</artifactId>
                <version>20.5.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquerystorage</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-vector</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-memory-netty</artifactId>
            <version>4.0.0</version>
        </dependency>
    </dependencies>

As mentioned before, when we use Arrow we need to import a library for the memory allocation Arrow needs.

We shall create first a plain Arrow Reader.
This Reader will be BigQuery agnostic. This is one of the benefits when we use a platform-language independent format.

An Arrow Binary shall be submitted to the reader with the schema and the rows shall be printed in CSV format.

package com.gkatzioura.bigquery.storage.api.arrow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;

public class ArrowReader implements AutoCloseable {

    private final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

    private final VectorSchemaRoot root;
    private final VectorLoader loader;

    public ArrowReader(ArrowSchema arrowSchema) throws IOException {
        Schema schema =
                MessageSerializer.deserializeSchema(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        arrowSchema.getSerializedSchema().toByteArray())));

        Preconditions.checkNotNull(schema);
        List<FieldVector> vectors = new ArrayList<>();
        for (Field field : schema.getFields()) {
            vectors.add(field.createVector(allocator));
        }

        root = new VectorSchemaRoot(vectors);
        loader = new VectorLoader(root);
    }

    public void processRows(ArrowRecordBatch batch) throws IOException {
        org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
                MessageSerializer.deserializeRecordBatch(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        batch.getSerializedRecordBatch().toByteArray())),
                        allocator);

        loader.load(deserializedBatch);
        deserializedBatch.close();
        System.out.println(root.contentToTSVString());
        root.clear();
    }

    @Override
    public void close() throws Exception {
        root.close();
        allocator.close();
    }

}

The constructor will have the schema injected, then the schema root shall be created.
Pay attention that we receive the schema in a binary form, it’s up to us and our library on how to read it.


        Schema schema =
                MessageSerializer.deserializeSchema(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        arrowSchema.getSerializedSchema().toByteArray())));

You can find more on reading Arrow data on this tutorial.

Then on to our main class which is the one with any BigQuery logic needed.

package com.gkatzioura.bigquery.storage.api.arrow;

import org.apache.arrow.util.Preconditions;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;

public class ArrowMain {

    public static void main(String[] args) throws Exception {

        String projectId = System.getenv("PROJECT_ID");

        try (BigQueryReadClient client = BigQueryReadClient.create()) {
            String parent = String.format("projects/%s", projectId);

            String srcTable =
                    String.format(
                            "projects/%s/datasets/%s/tables/%s",
                            projectId, System.getenv("DATASET"), System.getenv("TABLE"));

            ReadSession.Builder sessionBuilder =
                    ReadSession.newBuilder()
                               .setTable(srcTable)
                               .setDataFormat(DataFormat.ARROW);

            CreateReadSessionRequest.Builder builder =
                    CreateReadSessionRequest.newBuilder()
                                            .setParent(parent)
                                            .setReadSession(sessionBuilder)
                                            .setMaxStreamCount(1);
            ReadSession session = client.createReadSession(builder.build());

            try (ArrowReader reader = new ArrowReader(session.getArrowSchema())) {
                Preconditions.checkState(session.getStreamsCount() > 0);

                String streamName = session.getStreams(0).getName();

                ReadRowsRequest readRowsRequest =
                        ReadRowsRequest.newBuilder().setReadStream(streamName).build();

                ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
                for (ReadRowsResponse response : stream) {
                    Preconditions.checkState(response.hasArrowRecordBatch());
                    reader.processRows(response.getArrowRecordBatch());
                }
            }
        }
    }

}

A BigQuery client is created. Then we create a session request with a max number of streams. We do have to specify that the format to be used will be Arrow.
Once we get a Response, the response will contain the initiated the Session, the Arrow schema and the streams that we shall use to retrieve the Data.
For each stream there has to be a ReadRowsRequest in order to fetch the data.
Our next example will focus on reading data in Avro format.

BigQuery Storage API: Get Started and Comparisons

BigQuery provides us with the Storage API for fast access using an rpc-based protocal. With this option you can receive the data in a binary serialized format. The alternative ways to retrieve BigQuery Data is through the Rest API and a Bulk export.

Retrieving data through the Rest API is great for small result sets. For example if a product of an aggregation is going to have limited amount of rows it makes sense to use the Rest API, retrieve the results and use them on an application like Grafana. However when it comes to big result sets retrieving results in json, serializing and storing them, has an extra overhead. Exporting in Binary formats help you avoid this overhead.

Bulk Data export is a good solution to export big result sets however you are limited to where the data are getting stored (Google Cloud Storage), and some daily limits on exports.

Thus the storage API combines the flexibility of using a rpc protocol, the efficiency of downloading big results sets in a binary format and the flexibility to choose where those data shall be stored.

The storage API provides two ways to stream Data, either through Avro or through Arrow.

When using the Storage API first step is to create a Session. The format (Avro/Arrow) should be specified. This session can have more than one Streams, max number of streams can be specified.
Streams will contain the data in the format specified and can be read in parallel. The session expires on its own with no need for handling.

If a Session request is successful then it shall contain the schema of the data and the streams to use to download the data.

For the following example we assume the table, that we read data from has two columns, col1 is a string and col2 is a number. An Arrow example of this schema can be found here.

In order to test the storage api you need an account on GCP with the BigQuery Storage API enabled and a dataset created.

Let’s continue to the Arrow example.