Static Factories are Great!

Every now and then I jump on classes with multiple constructors or classes that are rigorous to work with. Let alone not being able to mock part of their components and at the end being forced to use reflection for testing (mockito based, old school, you choose).

Imagine a Producer class that you use for Kafka. A class that provides you with some abstraction on sending messages.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class StrMessageProducer {

	private Producer<String,String> producer;
	private String topic = "test-topic";
	
	StrMessageProducer() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getProperty("bootstrap.servers"));
		kafkaProperties.put("key.serializer",System.getProperty("key.serializer"));
		kafkaProperties.put("value.serialize",System.getProperty("value.serializer"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		this.producer = kafkaProducer;
	}

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();
			log.info("Submitted {}",metadata.offset());
		}
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);
		}
	}
}

Apart from being an ugly class it is also very hard to change some of its components.

For example

  • I cannot use this class to post to another topic
  • I cannot use this class to use a different server configuration apart from the one on the properties
  • It’s difficult to test the functionality of the class since crucial components are created through the constructor

It’s obvious that the constructor in this case serves the purpose on creating a Kafka producer based on the system properties. But the responsibility of the class is to use that producer in order send messages in a specific way. Thus I will move the creation of the Producer from the constructor. Also because we might want to swap the topic used, I will also inject the topic instead of having it hardcoded.
By doing so we encourage dependency injection. We make it easy to swap the ingredients of the class however the execution would be the same.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class StrMessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;
	}

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();
			log.info("Submitted {}",metadata.offset());
		}
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);
		}
	}
}

But we still need the producer to be created somehow. This is where the factory pattern kicks in.

We shall add static factories in order to have instances of the StrMessageProducer class with different configurations.
Let’s add two factory methods
The first factory method would be based on system properties and the second on environment variables.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class StrMessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;
	}

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();
			log.info("Submitted {}",metadata.offset());
		}
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);
		}
	}

	public static StrMessageProducer createFromSystemPros() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getProperty("bootstrap.servers"));
		kafkaProperties.put("key.serializer",System.getProperty("key.serializer"));
		kafkaProperties.put("value.serialize",System.getProperty("value.serializer"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new MessageProducer(kafkaProducer, System.getProperty("main.topic"));
	}

	public static StrMessageProducer createFromEnv() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getenv("BOOTSTRAP_SERVERS"));
		kafkaProperties.put("key.serializer",System.getenv("KEY_SERIALIZER"));
		kafkaProperties.put("value.serialize",System.getenv("VALUE_SERIALIZER"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new MessageProducer(kafkaProducer, System.getProperty("MAIN_TOPIC"));
	}
}

You already see the benefits. You have a clean class ready to use as it is and you have some factory methods for convenience. Eventually you can add more static factories, some of them might also have arguments, for example the topic.

Also we can go one step further when we want to have multiple classes of MessageProducers and we want to utilise an interface. So we are going to introduce the MessageProducer interface which our StrMessageProducer class will implement. Also we are going to put the static factories to the interface.

So this will be our interface with the static factories.

package com.gkatzioura.kafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

public interface MessageProducer {
	
	void send(String message);

	static MessageProducer createFromSystemPros() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getProperty("bootstrap.servers"));
		kafkaProperties.put("key.serializer",System.getProperty("key.serializer"));
		kafkaProperties.put("value.serialize",System.getProperty("value.serializer"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new StrMessageProducer(kafkaProducer, System.getProperty("main.topic"));
	}

	static MessageProducer createFromEnv() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getenv("BOOTSTRAP_SERVERS"));
		kafkaProperties.put("key.serializer",System.getenv("KEY_SERIALIZER"));
		kafkaProperties.put("value.serialize",System.getenv("VALUE_SERIALIZER"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new StrMessageProducer(kafkaProducer, System.getProperty("MAIN_TOPIC"));
	}

}

And this would be our new StrMessageProducer class.

package com.gkatzioura.kafka.producer;

import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class StrMessageProducer implements MessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;
	}

	@Override
	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();
			log.info("Submitted {}",metadata.offset());
		}
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);
		}
	}

}

Let’s check the benefits

  • We can have various implementations of a MessageProducer class
  • We can add as many factories we want that serve our purpose
  • We can easily test the MessageProducer implementation by passing mocks to the constructors
  • We keep our codebase cleaner

 

Locking for multiple nodes the easy way: GCS

It happens to all of us. We develop stateless applications that can scale horizontally without any effort.
However sometimes cases arise where you need to achieve some type of coordination.

You can go really advanced on this one. For example you can use a framework like Akka and it’s cluster capabilities. Or you can go really simple like rolling a mechanism on your own as long as it gives you the results needed. On another note you can just have different node groups based on the work you need them to do. The options and the solutions can change based on the problem.

If your problem can go with a simple option, one way to do so , provided you use Google Cloud Storage, is to use its lock capabilities.
Imagine for example a scenario of 4 nodes, they do scale dynamically but each time a new node registers you want to change its actions by acquiring a unique configuration which does not collide with a configuration another node might have received.

The strategy can be to use a file on Google Cloud Storage for locking and a file that acts as a centralised configuration registry.

The lock file is nothing more that a file on cloud storage which shall be created and deleted. What will give us lock abilities is the option on GCS to create a file only if it not exists.
Thus a process from one node will try to create the `lock` file, this action would be equivalent to obtaining the lock.
Once the process is done will delete the file, this action would be equivalent to releasing the lock.
Other processes in the meantime will try to create the file (acquire the lock) and fail (file already exists) because other processes have created the file.
Meanwhile the process that has successfully created the file (acquired the lock) will change the centralised configuration registry and once done will delete the file (release the lock).

So let’s start with the lock object.

package com.gkatzioura.gcs.lock;

import java.util.Optional;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;

public class GCSLock {

	public static final String LOCK_STRING = "_lock";
	private final Storage storage;
	private final String bucket;
	private final String keyName;

	private Optional<Blob> acquired = Optional.empty();

	GCSLock(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public boolean acquire() {
		try {
			var blobInfo = BlobInfo.newBuilder(bucket, keyName).build();
			var blob = storage.create(blobInfo, LOCK_STRING.getBytes(), Storage.BlobTargetOption.doesNotExist());
			acquired = Optional.of(blob);
			return true;
		} catch (StorageException storageException) {
			return false;
		}
	}

	public void release() {
		if(!acquired.isPresent()) {
			throw new IllegalStateException("Lock was never acquired");
		}
		storage.delete(acquired.get().getBlobId());
	}

}

As you can see the write specifies to write an object only if it does not exist. This operation behind the scenes is using the x-goog-if-generation-match header which is used for concurrency.
Thus one node will be able to acquire the lock and change the configuration files.
Afterwards it can delete the lock. If an exception is raised probably the operation fails and the lock is already acquired.

To make the example more complete let’s make the configuration file. The configuration file would be a simple json file for key map actions.

package com.gkatzioura.gcs.lock;

import java.util.HashMap;
import java.util.Map;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.json.JSONObject;

public class GCSConfiguration {

	private final Storage storage;
	private final String bucket;
	private final String keyName;

	GCSConfiguration(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public void addProperty(String key, String value) {
		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		final JSONObject configJson;

		if(blob==null) {
			configJson = new JSONObject();
		} else {
			configJson = new JSONObject(new String(blob.getContent()));
		}

		configJson.put(key, value);

		var blobInfo = BlobInfo.newBuilder(blobId).build();
		storage.create(blobInfo, configJson.toString().getBytes());
	}

	public Map<String,String> properties() {

		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		var map = new HashMap<String,String>();

		if(blob!=null) {
			var jsonObject = new JSONObject(new String(blob.getContent()));
			for(var key: jsonObject.keySet()) {
				map.put(key, jsonObject.getString(key));
			}
		}

		return map;
	}

}

It is a simple config util backed by GCS. Eventually it can be changed and put the lock operating inside the addProperty operation, it’s up to the user and the code. For the purpose of this blog we shall just acquire the lock change the configuration and release the lock.
Our main class will look like this.

package com.gkatzioura.gcs.lock;

import com.google.cloud.storage.StorageOptions;

public class Application {

	public static void main(String[] args) {
		var storage = StorageOptions.getDefaultInstance().getService();

		final String bucketName = "bucketName";
		final String lockFileName = "lockFileName";
		final String configFileName = "configFileName";

		var lock = new GCSLock(storage, bucketName, lockFileName);
		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

		var lockAcquired = lock.acquire();
		if(lockAcquired) {
			gcsConfig.addProperty("testProperty", "testValue");
			lock.release();
		}

		var config = gcsConfig.properties();

		for(var key: config.keySet()) {
			System.out.println("Key "+key+" value "+config.get(key));
		}

	}

}

Now let’s go for some multithreading. Ten threads will try to put values, it is expected that they have some failure.

package com.gkatzioura.gcs.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class ApplicationConcurrent {

	private static final String bucketName = "bucketName";
	private static final String lockFileName = "lockFileName";
	private static final String configFileName = "configFileName";

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		var storage = StorageOptions.getDefaultInstance().getService();

		final int threads = 10;
		var service = Executors.newFixedThreadPool(threads);
		var futures = new ArrayList<Future>(threads);

		for (var i = 0; i < threads; i++) {
			futures.add(service.submit(update(storage, "property-"+i, "value-"+i)));
		}

		for (var f : futures) {
			f.get();
		}

		service.shutdown();

		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);
		var properties = gcsConfig.properties();

		for(var i=0; i < threads; i++) { System.out.println(properties.get("property-"+i)); } } private static Runnable update(final Storage storage, String property, String value) { return () -> {
			var lock = new GCSLock(storage, bucketName, lockFileName);
			var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

			boolean lockAcquired = false;

			while (!lockAcquired) {
				lockAcquired = lock.acquire();
				System.out.println("Could not acquire lock");
			}

			gcsConfig.addProperty(property, value);
			lock.release();
		};
	}
}

Obviously 10 threads are ok to display the capabilities. During the thread initialization and execution some threads will try to acquire the lock simultaneously and one will fails, while other threads will be late and will fail and wait until the lock is available.

In the end what is expected is all of them to have their values added to the configuration.
That’s it. If your problems have a simple nature this approach might do the trick. Obviously you can use the http api instead of the sdk. You can find the code on github.

Spring Boot and Micrometer with Prometheus Part 6: Securing metrics

Previously we successfully spun up our Spring Boot application With Prometheus. An endpoint in our Spring application is exposing our metric data so that prometheus is able to retrieve them.
The main question that comes to mind is how to secure this information.

Spring already provides us with its great security framework, so it will be fairly easy to use it for our application. The goal would be to use basic authentication for the actuator/prometheus endpoints and also configure prometheus in order to access that information using basic authentication.

So the first step is to enable the security on our app. The first step is to add the security jar.

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-security</artifactId>
    </dependency>

The Spring boot application will get secured on its own by generating a password for the default user.
However we do want to have control over the username and password so we are going to use some environment variables.

By running the application with the credentials for the default user we have the prometheus endpoints secured with a minimal configuration.

SPRING_SECURITY_USER_NAME=test-user SPRING_SECURITY_USER_PASSWORD=test-password mvn spring-boot:run

So now that we have the security setup on our app, it’s time to update our prometheus config.

scrape_configs:
  - job_name: 'prometheus-spring'
    scrape_interval: 1m
    metrics_path: '/actuator/prometheus'
    static_configs:
      - targets: ['my.local.machine:8080']
    basic_auth:
      username: "test-user"
      password: "test-password"

So let’s run again prometheus as described previously.

To sum app after this change prometheus will gather metrics data for our application in a secure way.

My language will overtake yours

From time to time I stumble upon articles comparing one programming language to another or articles which claim that ‘one language will rule them all’.
Most of these comparisons depend largely on metrics based on searches and hits according to popularity (for example tiobe).

The question is not if the data is accurate but if comparisons like these are any more relevant to the current landscape of software development.

Back in the past we used to have languages developed in order to replace other languages and make developers more comfortable. Some languages became obsolete, and some were overtaken by others.
However as the industry continued to evolve, languages started to be associated with some certain fields of our industry. For example when it comes to machine learning and deep learning, python is the first language that comes to mind. As the fields of the industry involved the languages associated with, evolved too, leading to the development of tools and frameworks. As a result we now have programming languages with a huge ecosystem. The tools and the frameworks were not build in a day, it took years, a lot of effort, skills and experience on the pretty specific problems they had to tackle. As the years pass these tools mature and evolve.

Even though there are many good options out there, the major factors on adoption are not based on the language by itself but more on the tools that come with it.
Let’s take for example Java and the Java EE ecosystem. Although there is a great amount of articles discussing the death of Java, Java continuous to be the first choice, especially when it comes to enterprise development. There are certainly languages with better syntax and more convenient tools, but Java comes along with a huge ecosystem.
I believe that the language comparison should shift from language comparison to an ecosystem comparison based on the field.

Another fact that has to be considered is also the boost that happens to several industries. Certain industries require pretty specific solutions and tools, thus leading to a boost in the adoption to those tools and frameworks. It is not that a language just got more popular among developers, instead the industry just got bigger.

Finally we should take into consideration universities. Computer science universities play a leading role in the creation of the language popularity landscape.
For example many of the c language hits are due to university assignments. Furthermore r&d finds its way to the industry, implemented with different tools for environments much different than the original one.

All in all I believe that language comparison is no longer relevant. The industry has evolved a lot and the applications that we develop are largely different than the ones we used to develop in the past. In the past we used to develop console applications and it made sense to compare syntax and extra features. Nowadays we develop large scale applications in clustered environments employing various architectures and we need more than just a language to tackle these problems.