In any rest-api based application it’s a matter of time when there is going to be the need to intercept the requests towards the application and execute more than one actions. If those actions, are actions that need to apply towards all requests to the application then the usage of filters makes sense, for example security.
On Servlet based applications we used to have ContentCachingRequestWrapper and ContentCachingResponseWrapper. We look for the same qualities the above give but in a WebFlux environment.
The equivalent solution are the decorator classes provided by the webflux package: ServerHttpRequestDecorator, ServerHttpResponeDecorator, ServerWebExchangeDecorator.
Let’s get started with a simple Flux based api.
First we import the dependencies
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
The we create a simple model for a post request.
package com.gkatzioura.reactor.fluxfiltercapture; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @Builder @AllArgsConstructor @NoArgsConstructor public class Info { private String description; }
And the response
package com.gkatzioura.reactor.fluxfiltercapture; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @Builder @AllArgsConstructor @NoArgsConstructor public class InfoResponse { private boolean success; public static InfoResponse successful() { return InfoResponse.builder().success(true).build(); } }
A controller that uses the models will be implemented. The controller would be a simple echo.
package com.gkatzioura.reactor.fluxfiltercapture; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; @RestController public class InfoController { @PostMapping("/info") public Mono<InfoResponse> getInfo(@RequestBody Info info) { return Mono.just(InfoResponse.builder().success(true).build()); } }
A curl POST can help us debug.
curl --location --request POST 'http://localhost:8080/info' \ --header 'Content-Type: application/json' \ --data-raw '{ "description": "Check" }'
Your typical filter on Webflux has to implement the WebFilter interface and then if annotated will be picked up by the runtime.
@Component public class ExampleFilter implements WebFilter { @Override public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) { return webFilterChain.filter(serverWebExchange); } }
In our case we want to keep track both of the response and the request body.
Let’s start by creating a ServerHttpRequestDecorator implementation.
package com.gkatzioura.reactor.fluxfiltercapture; import java.nio.charset.StandardCharsets; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import reactor.core.publisher.Flux; public class BodyCaptureRequest extends ServerHttpRequestDecorator { private final StringBuilder body = new StringBuilder(); public BodyCaptureRequest(ServerHttpRequest delegate) { super(delegate); } public Flux<DataBuffer> getBody() { return super.getBody().doOnNext(this::capture); } private void capture(DataBuffer buffer) { this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString()); } public String getFullBody() { return this.body.toString(); } }
As we can see on the getBody implementation we add a method which will capture the byte chunks that flow while the actual service reads the body.
Once the request is finished the accumulated data will form the actual body.
Same pattern will apply for the ServerHttpResponeDecorator implementation.
package com.gkatzioura.reactor.fluxfiltercapture; import java.nio.charset.StandardCharsets; import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponseDecorator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class BodyCaptureResponse extends ServerHttpResponseDecorator { private final StringBuilder body = new StringBuilder(); public BodyCaptureResponse(ServerHttpResponse delegate) { super(delegate); } @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { Flux<DataBuffer> buffer = Flux.from(body); return super.writeWith(buffer.doOnNext(this::capture)); } private void capture(DataBuffer buffer) { this.body.append(StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString()); } public String getFullBody() { return this.body.toString(); } }
Here we override the writeWith function. Those data are are written and pushed down the stream we decorate the argument with a Flux in order to be able to use a method on doOnNext.
In both cases the bytes of the body and the response are accumulated. This might work for specific use cases, for example altering the request/response. If your use case is covered by just streaming the bytes to another system there is no need for accumulation, just an altered function on getBody and writeWith that streams the data will do the work.
Let’s go to our parent decorator that extends ServerWebExchangeDecorator.
package com.gkatzioura.reactor.fluxfiltercapture; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchangeDecorator; public class BodyCaptureExchange extends ServerWebExchangeDecorator { private BodyCaptureRequest bodyCaptureRequest; private BodyCaptureResponse bodyCaptureResponse; public BodyCaptureExchange(ServerWebExchange exchange) { super(exchange); this.bodyCaptureRequest = new BodyCaptureRequest(exchange.getRequest()); this.bodyCaptureResponse = new BodyCaptureResponse(exchange.getResponse()); } @Override public BodyCaptureRequest getRequest() { return bodyCaptureRequest; } @Override public BodyCaptureResponse getResponse() { return bodyCaptureResponse; } }
Time to focus on our filter. To make the example simple we will print on the console the request and response body.
package com.gkatzioura.reactor.fluxfiltercapture; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; @Component public class CustomWebFilter implements WebFilter { @Override public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) { BodyCaptureExchange bodyCaptureExchange = new BodyCaptureExchange(serverWebExchange); return webFilterChain.filter(bodyCaptureExchange).doOnSuccess( (se) -> { System.out.println("Body request "+bodyCaptureExchange.getRequest().getFullBody()); System.out.println("Body response "+bodyCaptureExchange.getResponse().getFullBody()); }); } }
If we run the Curl above eventually we shall have the body of the request and response printed.
You can find the source code on github.
Thanks to you I am now able to log the request body.
But not sure why this way works. When I tried doOnNext/subscribe on ServerRequest/ServerResponse via route().before(), route().after(), filter, interceptor, aop(@aspect), etc …. I got the same error — “Only one connection receive subscriber allowed” — and that is pretty understandable by the nature of reactors.
But your way is absolutely working and I don’t understand why it is. Can you add a few hints for me, please?
Again, Thank you very much. It is really precise and concise post.
Thank you for, Able to log server response where as uploading multipart not able to capture parts with getMultipart method, and doOnnext not getting triggered.
Please help me is there any tweaks required to log multipart data.
Is there any way to Retrieve request body on exchange.filter().doFirst(() -> //hrere) instead of .doOnSuccess ?