Locking for multiple nodes the easy way: GCS

It happens to all of us. We develop stateless applications that can scale horizontally without any effort.
However sometimes cases arise where you need to achieve some type of coordination.

You can go really advanced on this one. For example you can use a framework like Akka and it’s cluster capabilities. Or you can go really simple like rolling a mechanism on your own as long as it gives you the results needed. On another note you can just have different node groups based on the work you need them to do. The options and the solutions can change based on the problem.

If your problem can go with a simple option, one way to do so , provided you use Google Cloud Storage, is to use its lock capabilities.
Imagine for example a scenario of 4 nodes, they do scale dynamically but each time a new node registers you want to change its actions by acquiring a unique configuration which does not collide with a configuration another node might have received.

The strategy can be to use a file on Google Cloud Storage for locking and a file that acts as a centralised configuration registry.

The lock file is nothing more that a file on cloud storage which shall be created and deleted. What will give us lock abilities is the option on GCS to create a file only if it not exists.
Thus a process from one node will try to create the `lock` file, this action would be equivalent to obtaining the lock.
Once the process is done will delete the file, this action would be equivalent to releasing the lock.
Other processes in the meantime will try to create the file (acquire the lock) and fail (file already exists) because other processes have created the file.
Meanwhile the process that has successfully created the file (acquired the lock) will change the centralised configuration registry and once done will delete the file (release the lock).

So let’s start with the lock object.

package com.gkatzioura.gcs.lock;

import java.util.Optional;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;

public class GCSLock {

	public static final String LOCK_STRING = "_lock";
	private final Storage storage;
	private final String bucket;
	private final String keyName;

	private Optional<Blob> acquired = Optional.empty();

	GCSLock(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public boolean acquire() {
		try {
			var blobInfo = BlobInfo.newBuilder(bucket, keyName).build();
			var blob = storage.create(blobInfo, LOCK_STRING.getBytes(), Storage.BlobTargetOption.doesNotExist());
			acquired = Optional.of(blob);
			return true;
		} catch (StorageException storageException) {
			return false;
		}
	}

	public void release() {
		if(!acquired.isPresent()) {
			throw new IllegalStateException("Lock was never acquired");
		}
		storage.delete(acquired.get().getBlobId());
	}

}

As you can see the write specifies to write an object only if it does not exist. This operation behind the scenes is using the x-goog-if-generation-match header which is used for concurrency.
Thus one node will be able to acquire the lock and change the configuration files.
Afterwards it can delete the lock. If an exception is raised probably the operation fails and the lock is already acquired.

To make the example more complete let’s make the configuration file. The configuration file would be a simple json file for key map actions.

package com.gkatzioura.gcs.lock;

import java.util.HashMap;
import java.util.Map;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.json.JSONObject;

public class GCSConfiguration {

	private final Storage storage;
	private final String bucket;
	private final String keyName;

	GCSConfiguration(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public void addProperty(String key, String value) {
		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		final JSONObject configJson;

		if(blob==null) {
			configJson = new JSONObject();
		} else {
			configJson = new JSONObject(new String(blob.getContent()));
		}

		configJson.put(key, value);

		var blobInfo = BlobInfo.newBuilder(blobId).build();
		storage.create(blobInfo, configJson.toString().getBytes());
	}

	public Map<String,String> properties() {

		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		var map = new HashMap<String,String>();

		if(blob!=null) {
			var jsonObject = new JSONObject(new String(blob.getContent()));
			for(var key: jsonObject.keySet()) {
				map.put(key, jsonObject.getString(key));
			}
		}

		return map;
	}

}

It is a simple config util backed by GCS. Eventually it can be changed and put the lock operating inside the addProperty operation, it’s up to the user and the code. For the purpose of this blog we shall just acquire the lock change the configuration and release the lock.
Our main class will look like this.

package com.gkatzioura.gcs.lock;

import com.google.cloud.storage.StorageOptions;

public class Application {

	public static void main(String[] args) {
		var storage = StorageOptions.getDefaultInstance().getService();

		final String bucketName = "bucketName";
		final String lockFileName = "lockFileName";
		final String configFileName = "configFileName";

		var lock = new GCSLock(storage, bucketName, lockFileName);
		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

		var lockAcquired = lock.acquire();
		if(lockAcquired) {
			gcsConfig.addProperty("testProperty", "testValue");
			lock.release();
		}

		var config = gcsConfig.properties();

		for(var key: config.keySet()) {
			System.out.println("Key "+key+" value "+config.get(key));
		}

	}

}

Now let’s go for some multithreading. Ten threads will try to put values, it is expected that they have some failure.

package com.gkatzioura.gcs.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class ApplicationConcurrent {

	private static final String bucketName = "bucketName";
	private static final String lockFileName = "lockFileName";
	private static final String configFileName = "configFileName";

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		var storage = StorageOptions.getDefaultInstance().getService();

		final int threads = 10;
		var service = Executors.newFixedThreadPool(threads);
		var futures = new ArrayList<Future>(threads);

		for (var i = 0; i < threads; i++) {
			futures.add(service.submit(update(storage, "property-"+i, "value-"+i)));
		}

		for (var f : futures) {
			f.get();
		}

		service.shutdown();

		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);
		var properties = gcsConfig.properties();

		for(var i=0; i < threads; i++) { System.out.println(properties.get("property-"+i)); } } private static Runnable update(final Storage storage, String property, String value) { return () -> {
			var lock = new GCSLock(storage, bucketName, lockFileName);
			var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

			boolean lockAcquired = false;

			while (!lockAcquired) {
				lockAcquired = lock.acquire();
				System.out.println("Could not acquire lock");
			}

			gcsConfig.addProperty(property, value);
			lock.release();
		};
	}
}

Obviously 10 threads are ok to display the capabilities. During the thread initialization and execution some threads will try to acquire the lock simultaneously and one will fails, while other threads will be late and will fail and wait until the lock is available.

In the end what is expected is all of them to have their values added to the configuration.
That’s it. If your problems have a simple nature this approach might do the trick. Obviously you can use the http api instead of the sdk. You can find the code on github.

Pub/Sub local emulator

Pub/Sub is a nice tool provided by GCP.  It is really handy and can help you with the messaging challenges you application might face. Actually if you work with GCP it is the managed messaging solution that you can use.

As expected working with the actual Pub/Sub solution comes with some quota, so for

development it is essential to use something which is not going to cost you.

In these cases you can use the Pub/Sub emulator. To get started with the emulator you need to install it

gcloud components install pubsub-emulator

It is indeed convenient however to have a docker image since it is way more portable. Unfortunately there is no official image for that from google cloud. In any case you can use one of the solutions available on Docker Hub.

Now let’s run it

gcloud beta emulators pubsub start --project=test-project

After that your application can connect to the pub/sub emulator. The default port is 8085

I will use a Java unit test as an example for this one.

package org.gkatzioura.pubsub;

import java.io.IOException;
import java.nio.charset.Charset;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class LocalPubSubTest {

    private static final String PROJECT = "test-project";
    private static final String SUBSCRIPTION_NAME = "SUBSCRIBER";
    private static final String TOPIC_NAME = "test-topic-id";

    private static final String hostPort = "127.0.0.1:8085";

    private ManagedChannel channel;
    private TransportChannelProvider channelProvider;
    private TopicAdminClient topicAdmin;

    private Publisher publisher;
    private SubscriberStub subscriberStub;
    private SubscriptionAdminClient subscriptionAdminClient;

    private ProjectTopicName topicName = ProjectTopicName.of(PROJECT, TOPIC_NAME);
    private ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT, SUBSCRIPTION_NAME);

    private Subscription subscription;

    @Before
    public void setUp() throws Exception {
        channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build();
        channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

        CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

        topicAdmin = createTopicAdmin(credentialsProvider);
        topicAdmin.createTopic(topicName);

        publisher = createPublisher(credentialsProvider);
        subscriberStub = createSubscriberStub(credentialsProvider);
        subscriptionAdminClient = createSubscriptionAdmin(credentialsProvider);
        subscription = subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
    }

    @After
    public void tearDown() throws Exception {
        topicAdmin.deleteTopic(topicName);
        subscriptionAdminClient.deleteSubscription(subscription.getName());
        channel.shutdownNow();
    }

    @Test
    public void testLocalPubSub() throws Exception {
        final String messageText = "text";
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                                                   .setData(ByteString.copyFrom(messageText, Charset.defaultCharset()))
                                                   .build();
        publisher.publish(pubsubMessage).get();

        PullRequest pullRequest = PullRequest.newBuilder()
                                             .setMaxMessages(1)
                                             .setReturnImmediately(true) // return immediately if messages are not available
                                             .setSubscription(subscription.getName())
                                             .build();

        PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest);
        String receiveMessageText = pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8();

        Assert.assertEquals(messageText, receiveMessageText);
    }

    private TopicAdminClient createTopicAdmin(CredentialsProvider credentialsProvider) throws IOException {
        return TopicAdminClient.create(
                TopicAdminSettings.newBuilder()
                                  .setTransportChannelProvider(channelProvider)
                                  .setCredentialsProvider(credentialsProvider)
                                  .build()
        );
    }

    private SubscriptionAdminClient createSubscriptionAdmin(CredentialsProvider credentialsProvider) throws IOException {
        SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
                                                                                       .setCredentialsProvider(credentialsProvider)
                                                                                       .setTransportChannelProvider(channelProvider)
                                                                                       .build();
        return SubscriptionAdminClient.create(subscriptionAdminSettings);
    }

    private Publisher createPublisher(CredentialsProvider credentialsProvider) throws IOException {
        return Publisher.newBuilder(topicName)
                        .setChannelProvider(channelProvider)
                        .setCredentialsProvider(credentialsProvider)
                        .build();
    }

    private SubscriberStub createSubscriberStub(CredentialsProvider credentialsProvider) throws IOException {
        SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
                                                                              .setTransportChannelProvider(channelProvider)
                                                                              .setCredentialsProvider(credentialsProvider)
                                                                              .build();
        return GrpcSubscriberStub.create(subscriberStubSettings);
    }

}

That’s it. Now you can have some cost efficient unit tests!

Host your maven artifacts on the cloud using CloudStorageMaven

One of the major issues when dealing with large codebases in our teams has to do with artifact sharing and artifact storage.

There are various options out there that provide many features such as jfrog, nexus, archiva etc.

I have been into using them, setting them up and configuring and they certainly provide you with many features. Also having you own repository installation gives you a lot of flexibility. Furthermore docker has made things a lot easier and thus setting them up takes almost no time.

Now if you use a cloud provider like amazon, azure etc there is a more lightweight option and pretty easy to setup. By using a cloud provider such as amazon, azure or google you have cheap and easy access to storage. The storage options that they provide can also be used in order to host your private artifacts or even your public ones.

To do so you need to use a maven wagon which is capable to communicate with the storage options that your cloud provider has and this is exactly what the CloudStorageMaven project deals with.

The CloudStorageMaven project provides you with wagons interacting with Amazon S3, Azure Blob Storage and Google Cloud Storage.

If you already use one of these cloud services hosting your artificats on them seems like a no brainer and theese wagons make it a lot easier to do so.

I have compiled some tutorials on how to get started with each one of them

Happy coding!