Project Reactor is a fully non-blocking foundation with back-pressure support included. Although most libraries out there support asynchronous methods thus assist on its usage, there are some cases where a library contains complex blocking methods without an asynchronous implementation. Calling this methods inside a reactor stream would have bad results. Instead we need to make those method to async ones or find if there is a workaround.
Provided you might be short on time and is not possible to contribute a patch to the tool used, or you cannot identify how to reverse engineer the blocking call and implement a non blocking version, then it makes sense to utilise some threads.
First let’s import the dependencies for our project
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2020.0.11</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.8.1</version> <scope>test</scope> </dependency> </dependencies>
Let’s start with out blocking service
public String get(String url) throws IOException { HttpURLConnection connection = (HttpsURLConnection) new URL(url).openConnection(); connection.setRequestMethod("GET"); connection.setDoOutput(true); try(InputStream inputStream = connection.getInputStream()) { return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); } }
We used HttpsURLConnection since we know for sure that it is a blocking call. To do so we need a Scheduler. For the blocking calls we shall use the boundedElastic scheduler. A scheduler can also be created by an existing executor service.
So let’s transform this method to a non-blocking one.
package com.gkatzioura.blocking; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class BlockingAsyncService { private final BlockingService blockingService; public BlockingAsyncService(BlockingService blockingService) { this.blockingService = blockingService; } private Mono<String> get(String url) { return Mono.fromCallable(() -> blockingService.get(url)) .subscribeOn(Schedulers.boundedElastic()); } }
What we can see is a Mono created from the callable method. A scheduler subscribes to this mono and thus will receive the event emitted, which shall be scheduled for execution.
Let’s have a test
package com.gkatzioura.blocking; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; class BlockingAsyncServiceTest { private BlockingAsyncService blockingAsyncService; @BeforeEach void setUp() { blockingAsyncService = new BlockingAsyncService(new BlockingService()); } @Test void name() { StepVerifier.create( Mono.just("https://www.google.com/") .map(s -> blockingAsyncService.get(s)) .flatMap(s -> s) ) .consumeNextWith(s -> s.startsWith("<!doctype")) .verifyComplete(); } }
That’s it! Obviously the best thing to do is to find a way to make this blocking call into an async call and try to find a workaround using the async libraries out there. When it’s not feasible we can fallback on using Threads.