Keeping track of requests and Responses on Spring WebFlux

In any rest-api based application it’s a matter of time when there is going to be the need to intercept the requests towards the application and execute more than one actions. If those actions, are actions that need to apply towards all requests to the application then the usage of filters makes sense, for example security.

On Servlet based applications we used to have ContentCachingRequestWrapper and ContentCachingResponseWrapper. We look for the same qualities the above give but in a WebFlux environment.

The equivalent solution are the decorator classes provided by the webflux package: ServerHttpRequestDecorator, ServerHttpResponeDecorator, ServerWebExchangeDecorator.

Let’s get started with a simple Flux based api.

First we import the dependencies

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.20</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

The we create a simple model for a post request.

package com.gkatzioura.reactor.fluxfiltercapture;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Info {

    private String description;

}

And the response

package com.gkatzioura.reactor.fluxfiltercapture;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class InfoResponse {

    private boolean success;

    public static InfoResponse successful() {
        return InfoResponse.builder().success(true).build();
    }
}

A controller that uses the models will be implemented. The controller would be a simple echo.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Mono;

@RestController
public class InfoController {


    @PostMapping("/info")
    public Mono<InfoResponse> getInfo(@RequestBody Info info) {
        return Mono.just(InfoResponse.builder().success(true).build());
    }

}

A curl POST can help us debug.

curl --location --request POST 'http://localhost:8080/info' \
--header 'Content-Type: application/json' \
--data-raw '{
"description": "Check"
}'

Your typical filter on Webflux has to implement the WebFilter interface and then if annotated will be picked up by the runtime.

@Component
public class ExampleFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {
        return webFilterChain.filter(serverWebExchange);
    }

}

In our case we want to keep track both of the response and the request body.
Let’s start by creating a ServerHttpRequestDecorator implementation.

package com.gkatzioura.reactor.fluxfiltercapture;

import java.nio.charset.StandardCharsets;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;

public class BodyCaptureRequest extends ServerHttpRequestDecorator {

    private final StringBuilder body = new StringBuilder();

    public BodyCaptureRequest(ServerHttpRequest delegate) {
        super(delegate);
    }

    public Flux<DataBuffer> getBody() {
        return super.getBody().doOnNext(this::capture);
    }

    private void capture(DataBuffer buffer) {
        this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
    }

    public String getFullBody() {
        return this.body.toString();
    }

}

As we can see on the getBody implementation we add a method which will capture the byte chunks that flow while the actual service reads the body.
Once the request is finished the accumulated data will form the actual body.

Same pattern will apply for the ServerHttpResponeDecorator implementation.

package com.gkatzioura.reactor.fluxfiltercapture;

import java.nio.charset.StandardCharsets;

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BodyCaptureResponse extends ServerHttpResponseDecorator {

    private final StringBuilder body = new StringBuilder();

    public BodyCaptureResponse(ServerHttpResponse delegate) {
        super(delegate);
    }

    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
        Flux<DataBuffer> buffer = Flux.from(body);
        return super.writeWith(buffer.doOnNext(this::capture));
    }

    private void capture(DataBuffer buffer) {
        this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString());
    }

    public String getFullBody() {
        return this.body.toString();
    }

}

Here we override the writeWith function. Those data are are written and pushed down the stream we decorate the argument with a Flux in order to be able to use a method on doOnNext.

In both cases the bytes of the body and the response are accumulated. This might work for specific use cases, for example altering the request/response. If your use case is covered by just streaming the bytes to another system there is no need for accumulation, just an altered function on getBody and writeWith that streams the data will do the work.

Let’s go to our parent decorator that extends ServerWebExchangeDecorator.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;

public class BodyCaptureExchange extends ServerWebExchangeDecorator {

    private BodyCaptureRequest bodyCaptureRequest;
    private BodyCaptureResponse bodyCaptureResponse;

    public BodyCaptureExchange(ServerWebExchange exchange) {
        super(exchange);
        this.bodyCaptureRequest = new BodyCaptureRequest(exchange.getRequest());
        this.bodyCaptureResponse = new BodyCaptureResponse(exchange.getResponse());
    }

    @Override
    public BodyCaptureRequest getRequest() {
        return bodyCaptureRequest;
    }

    @Override
    public BodyCaptureResponse getResponse() {
        return bodyCaptureResponse;
    }

}

Time to focus on our filter. To make the example simple we will print on the console the request and response body.

package com.gkatzioura.reactor.fluxfiltercapture;

import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import reactor.core.publisher.Mono;

@Component
public class CustomWebFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {
        BodyCaptureExchange bodyCaptureExchange = new BodyCaptureExchange(serverWebExchange);
        return webFilterChain.filter(bodyCaptureExchange).doOnSuccess( (se) -> {
            System.out.println("Body request "+bodyCaptureExchange.getRequest().getFullBody());
            System.out.println("Body response "+bodyCaptureExchange.getResponse().getFullBody());
        });
    }

}

If we run the Curl above eventually we shall have the body of the request and response printed.
You can find the source code on github.

Apache Arrow on the JVM: Streaming Reads

Previously we wrote Arrow Data to a Stream. Now we shall read those data from a stream.

Just like on the previous blog the we shall implement the Closeable interface. This is needed to close the RootAllocator and free-up memory.

We shall pass a ReadableByteChannel and thus get the stream into read objects. 

package com.gkatzioura.arrow;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;

public class DefaultEntriesReader implements Closeable {

    private final RootAllocator rootAllocator;

    public DefaultEntriesReader() {
        rootAllocator = new RootAllocator(Integer.MAX_VALUE);
    }

    public List<DefaultArrowEntry> readBytes(ReadableByteChannel readableByteChannel) throws IOException {
        List<DefaultArrowEntry> defaultArrowEntries = new ArrayList<>();

        try(ArrowStreamReader arrowStreamReader = new ArrowStreamReader(readableByteChannel, rootAllocator)) {
            var root = arrowStreamReader.getVectorSchemaRoot();

            var childVector1 = (VarCharVector)root.getVector(0);
            var childVector2 = (IntVector)root.getVector(1);

            while (arrowStreamReader.loadNextBatch()) {

                int batchSize = root.getRowCount();

                for (int i = 0; i < batchSize; i++) {
                    var strData = new String(childVector1.get(i));
                    var intData = childVector2.get(i);

                    DefaultArrowEntry defaultArrowEntry = DefaultArrowEntry.builder().col1(strData).col2(intData).build();
                    defaultArrowEntries.add(defaultArrowEntry);
                }
            }

            return defaultArrowEntries;
        }
    }

    @Override
    public void close() throws IOException {
        rootAllocator.close();
    }
}

Let’s wrap it up with a write and a Read

package com.gkatzioura.arrow;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ArrowMain {

    public static void main(String[] args) throws IOException {
        var originalEntries = IntStream.rangeClosed(0, 11)
                             .boxed()
                             .map(i -> new DefaultArrowEntry("data-"+i, i)).collect(Collectors.toList());

        var outputStream = new ByteArrayOutputStream();

        try(var arrowWriter = new DefaultEntriesWriter()) {
            arrowWriter.write(originalEntries, 10, Channels.newChannel(outputStream));
        }

        byte[] introBytes = outputStream.toByteArray();

        var inputStream = new ByteArrayInputStream(introBytes);

        try(var arrowReader = new DefaultEntriesReader()) {
            var entries =arrowReader.readBytes(Channels.newChannel(inputStream));
            for (DefaultArrowEntry entry : entries) {
                System.out.println("Read "+entry.getCol1()+" "+entry.getCol2());
            }
        }

    }

}

That’s it. To summarise we created Arrow Schemas, we wrote data to a Stream and we read data from a Stream!

Apache Arrow on the JVM: Streaming Writes

Previously we went to create some schemas on Arrow.  On this blog we will have a look on writing through streaming API.

Based on the previous post’s Schema we shall create a DTO for our classes.

package com.gkatzioura.arrow;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class DefaultArrowEntry {

    private String col1;
    private Integer col2;

}

Our goal would be to transform those Java objects into a Stream of Arrow bytes.

The allocator creates DirectByteBuffer‘s.
Those buffers are off-heap. You do need to free up the memory used, but for the library user this is done by executing the close() operation on the allocator. In our case our class will implement the Closeable interface which shall do the allocator close operation.

By using the stream api, the data will be streamed to the OutPutStream submitted using the Arrow format.

package com.gkatzioura.arrow;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.util.Text;

import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA;

public class DefaultEntriesWriter implements Closeable {

    private final RootAllocator rootAllocator;
    private final VectorSchemaRoot vectorSchemaRoot;

    public DefaultEntriesWriter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

    public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) {
        if (batchSize <= 0) {
            batchSize = defaultArrowEntries.size();
        }

        DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();
        try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
            writer.start();

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

            boolean exactBatches = defaultArrowEntries.size()%batchSize == 0;
            int batchCounter = 0;

            for(int i=0; i < defaultArrowEntries.size(); i++) {
                childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2());

                batchCounter++;

                if(batchCounter == batchSize) {
                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();
                    batchCounter = 0;
                }
            }

            if(!exactBatches) {
                vectorSchemaRoot.setRowCount(batchCounter);
                writer.writeBatch();
            }

            writer.end();
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

}

To display the support of batches on Arrow a simple batch algorithm has been implemented within the function. For our example just take into account that data will be written in batches.

Let’s dive into the function.

The vector allocator discussed previously is created

    public DefaultEntriesToBytesConverter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

Then when writing to a stream, an arrow stream writer is implemented and started

ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out));
writer.start();

We shall use the vectors in order to populated them with the data. Also reset them but let the pre-alocated buffers to exist

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

We use the setSafe operation when writing data. This way if more buffer needs to be allocated shall be done. For this example it’s done on every write, but can be avoided when the operations and the buffer size needed is taken into account.

                childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

Then we write the batch to the stream.


                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();

Last but not least we close the writer.

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

The next blog will focus on reading Arrow Data from a stream.

Apache Arrow on the JVM: Get Started and Schemas

Arrow is memory format for flat and hierarchical data. It is a popular format used by various big data tools, among them BigQuery. One of the benefits that Arrow brings is that the format of the data has the same byte representation on the languages supported. So apart from the benefits of a columnar memory format there are also the benefits of zero-copy without the serialization overhead.

Apache Arrow defines a language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware like CPUs and GPUs. The Arrow memory format also supports zero-copy reads for lightning-fast data access without serialization overhead. more

Let’s import the libraries

        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-memory-netty</artifactId>
            <version>${arrow.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-vector</artifactId>
            <version>${arrow.version}</version>
        </dependency>

Before starting it is essential to understand that for Read/Write operations on Arrow, byte buffers are used. Operations like reading and writing is continuous exchange of bytes. To make this efficient Arrow comes with a buffer allocator, which can have a certain size or have an automatic expansion.
The libraries backing the allocation management is arrow-memory-netty and arrow-memory-unsafe. We shall use the netty one.

Storing Data in arrow requires a schema. Schemas can be defined programatically

package com.gkatzioura.arrow;

import java.io.IOException;
import java.util.List;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

public class SchemaFactory {

    public static Schema DEFAULT_SCHEMA = createDefault();

    public static Schema createDefault() {
        var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);
        var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);

        return new Schema(List.of(strField, intField));
    }

    public static Schema schemaWithChildren() {
        var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
        var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
        var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

        return new Schema(List.of(itemField));
    }

    public static Schema fromJson(String jsonString) {
        try {
            return Schema.fromJSON(jsonString);
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

}

Also they have a parseable json representation.

{
  "fields" : [ {
    "name" : "col1",
    "nullable" : true,
    "type" : {
      "name" : "utf8"
    },
    "children" : [ ]
  }, {
    "name" : "col2",
    "nullable" : true,
    "type" : {
      "name" : "int",
      "bitWidth" : 32,
      "isSigned" : true
    },
    "children" : [ ]
  } ]
}

Plus just like Avro you can have complex schemas and embedded values on a field.

    public static Schema schemaWithChildren() {
        var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
        var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
        var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

        return new Schema(List.of(itemField));
    }

On the next blog, we shall use the Streaming API for Arrow