Pub/Sub is a nice tool provided by GCP. It is really handy and can help you with the messaging challenges you application might face. Actually if you work with GCP it is the managed messaging solution that you can use.
As expected working with the actual Pub/Sub solution comes with some quota, so for
development it is essential to use something which is not going to cost you.
In these cases you can use the Pub/Sub emulator. To get started with the emulator you need to install it
gcloud components install pubsub-emulator
It is indeed convenient however to have a docker image since it is way more portable. Unfortunately there is no official image for that from google cloud. In any case you can use one of the solutions available on Docker Hub.
Now let’s run it
gcloud beta emulators pubsub start --project=test-project
After that your application can connect to the pub/sub emulator. The default port is 8085
I will use a Java unit test as an example for this one.
package org.gkatzioura.pubsub; import java.io.IOException; import java.nio.charset.Charset; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.pubsub.v1.TopicAdminSettings; import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Subscription; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; public class LocalPubSubTest { private static final String PROJECT = "test-project"; private static final String SUBSCRIPTION_NAME = "SUBSCRIBER"; private static final String TOPIC_NAME = "test-topic-id"; private static final String hostPort = "127.0.0.1:8085"; private ManagedChannel channel; private TransportChannelProvider channelProvider; private TopicAdminClient topicAdmin; private Publisher publisher; private SubscriberStub subscriberStub; private SubscriptionAdminClient subscriptionAdminClient; private ProjectTopicName topicName = ProjectTopicName.of(PROJECT, TOPIC_NAME); private ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT, SUBSCRIPTION_NAME); private Subscription subscription; @Before public void setUp() throws Exception { channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build(); channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); topicAdmin = createTopicAdmin(credentialsProvider); topicAdmin.createTopic(topicName); publisher = createPublisher(credentialsProvider); subscriberStub = createSubscriberStub(credentialsProvider); subscriptionAdminClient = createSubscriptionAdmin(credentialsProvider); subscription = subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0); } @After public void tearDown() throws Exception { topicAdmin.deleteTopic(topicName); subscriptionAdminClient.deleteSubscription(subscription.getName()); channel.shutdownNow(); } @Test public void testLocalPubSub() throws Exception { final String messageText = "text"; PubsubMessage pubsubMessage = PubsubMessage.newBuilder() .setData(ByteString.copyFrom(messageText, Charset.defaultCharset())) .build(); publisher.publish(pubsubMessage).get(); PullRequest pullRequest = PullRequest.newBuilder() .setMaxMessages(1) .setReturnImmediately(true) // return immediately if messages are not available .setSubscription(subscription.getName()) .build(); PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest); String receiveMessageText = pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8(); Assert.assertEquals(messageText, receiveMessageText); } private TopicAdminClient createTopicAdmin(CredentialsProvider credentialsProvider) throws IOException { return TopicAdminClient.create( TopicAdminSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .build() ); } private SubscriptionAdminClient createSubscriptionAdmin(CredentialsProvider credentialsProvider) throws IOException { SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder() .setCredentialsProvider(credentialsProvider) .setTransportChannelProvider(channelProvider) .build(); return SubscriptionAdminClient.create(subscriptionAdminSettings); } private Publisher createPublisher(CredentialsProvider credentialsProvider) throws IOException { return Publisher.newBuilder(topicName) .setChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .build(); } private SubscriberStub createSubscriberStub(CredentialsProvider credentialsProvider) throws IOException { SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .build(); return GrpcSubscriberStub.create(subscriberStubSettings); } }
That’s it. Now you can have some cost efficient unit tests!
Did you run into the following error in the emulator stack trace when executing the above code?
[pubsub] Oct 13, 2020 2:38:38 PM io.gapi.emulators.netty.NotFoundHandler handleRequest
[pubsub] INFO: Unknown request URI: /bad-request
[pubsub] Oct 13, 2020 2:40:47 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Oct 13, 2020 2:40:47 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.