Java Based Akka application Part 2: Adding tests

On the previous blog we focused on spinning up our first Akka project.
Now it’s time to add a test for our codebase.

First thing to get started is adding the right dependencies to the existing project.

	<dependencies>
		<dependency>
			<groupId>com.typesafe.akka</groupId>
			<artifactId>akka-actor-typed_2.13</artifactId>
			<version>${akka.version}</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.16</version>
			<scope>provided</scope>
		</dependency>

		<!-- Test -->
		<dependency>
			<groupId>com.typesafe.akka</groupId>
			<artifactId>akka-actor-testkit-typed_2.13</artifactId>
			<version>${akka.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.13.1</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

What you shall notice is the usage of Junit 4, instead of Junit 5. Some of the testing utils like TestKitJunitResource need annotations like @ClassRule and are bound to Junit4. Obviously this is not a blocker on using JUnit 5, with some tweaks it is feasible to use the tools your project needs. However in this example Junit 4 shall be used.

Before we write the test we need to think of our code.
It is obvious that we sent a message to our actor in a fire and forget fashion.

	private Behavior<GuardianMessage> receiveMessage(MessageToGuardian messageToGuardian) {
		getContext().getLog().info("Message received: {}",messageToGuardian.getMessage());
		return this;
	}

If you don’t have a way to intercept what happens inside the method your options are limited. In those cases you can utilise the log messages and actually expect log events to happen.

Before we add the unit test we need to make some logback adjustments. This will take effect only on our test logback.xml. More specific we need to have an appender on logback that captured the data. This is the CapturingAppender.

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
        <encoder>
            <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
        </encoder>
    </appender>

    <!-- Logging from tests are silenced by this appender. When there is a test failure the captured logging events are flushed to the appenders defined for the akka.actor.testkit.typed.internal.CapturingAppenderDelegate logger. -->
    <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />

    <!-- The appenders defined for this CapturingAppenderDelegate logger are used when there is a test failure and all logging events from the test are flushed to these appenders. -->
    <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
      <appender-ref ref="STDOUT"/>
    </logger>

    <root level="DEBUG">
        <appender-ref ref="CapturingAppender"/>
    </root>
</configuration>

Now it’s time to add the unit test.

package com.gkatzioura;

import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.LoggingTestKit;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class AppGuardianTests {

	@ClassRule
	public static final TestKitJunitResource testKit = new TestKitJunitResource();

	@Rule
	public final LogCapturing logCapturing = new LogCapturing();

	@Test
	public void testReceiveMessage() {
		ActorRef<AppGuardian.GuardianMessage> underTest = testKit.spawn(AppGuardian.create(), "app-guardian");

		LoggingTestKit.info("Message received: hello")
				.expect(
						testKit.system(),
						() -> {
							underTest.tell(new AppGuardian.MessageToGuardian("hello"));
							return null;
						});
	}

}

Once we run the test the expected outcome is to pass. The actor did receive the message, did execute a logging action and this was captured by the CapturingAppender. Then the logging event was validated if it was the expected one. In case of exception probably you need to check if the logback.xml took effect.

As always you can find the source code on github.

Advertisement

Java Based Akka application Part 1: Your base Project

Akka is a free, open-source toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM. Along with Akka you have akka-streams  a module that makes the ingestion and processing of streams easy  and Alpakka, a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.

On this blog I shall focus on creating an Akka project using Java as well as packaging it.

 

You already know that Akka is built on Scala, thus why Java and no Scala? There are various reasons to go for Java.

  • Akka is a toolkit running on the JVM so you don’t have to be proficient with Scala to use it.
  • You might have a team already proficient with Java but not in Scala.
  • It’s much easier to evaluate if you already have a codebase on Java and the various build tools (maven etc)

Will shall go for the simple route and Download the Application from lightbend quickstart. The project received, will be backed with typed actors.

After some adaption the maven file would look like this, take note that we shall use lombok .

<project>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gkatzioura</groupId>
    <artifactId>akka-java-app</artifactId>
    <version>1.0</version>

    <properties>
      <akka.version>2.6.10</akka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor-typed_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.6.0</version>
                <configuration>
                    <executable>java</executable>
                    <arguments>
                        <argument>-classpath</argument>
                        <classpath />
                        <argument>com.gkatzioura.Application</argument>
                    </arguments>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Now there is one Actor that is responsible for managing your other actors. This is the top level actor called Guardian Actor. It is created along with the ActorSystem and when it stops the ActorSystem will stop too.

In order to create an actor you define the message the actor will receive and you specify why it will behave to those messages.

package com.gkatzioura;

import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import lombok.AllArgsConstructor;
import lombok.Getter;

public class AppGuardian extends AbstractBehavior<AppGuardian.GuardianMessage> {

	public interface GuardianMessage {}

	static Behavior<GuardianMessage> create() {
		return Behaviors.setup(AppGuardian::new);
	}

	@Getter
	@AllArgsConstructor
	public static class MessageToGuardian implements GuardianMessage {
		private String message;
	}

	private AppGuardian(ActorContext<GuardianMessage> context) {
		super(context);
	}

	@Override
	public Receive<GuardianMessage> createReceive() {
		return newReceiveBuilder().onMessage(MessageToGuardian.class, this::receiveMessage).build();
	}

	private Behavior<GuardianMessage> receiveMessage(MessageToGuardian messageToGuardian) {
		getContext().getLog().info("Message received: {}",messageToGuardian.getMessage());
		return this;
	}

}

Akka is message driven so the guardian actor should be able to consume messages send to it. Therefore messages that implement the GuardianMessage interface are going to be processed.

By creating the actor the createReceive method is used in order to add handling of the messages that the actor should handle.

Be aware that when it comes to logging instead of spinning up a logger in the class use the
getContext().getLog()

Behind the scenes the log messages will have the path of the actor automatically added as akkaSource Mapped Diagnostic Context (MDC) value.

Last step would be to add the Main class.

package com.gkatzioura;

import java.io.IOException;

import akka.actor.typed.ActorSystem;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Application {

	public static final String APP_NAME = "akka-java-app";

	public static void main(String[] args) {
		final ActorSystem<AppGuardian.GuardianMessage> appGuardian = ActorSystem.create(AppGuardian.create(), APP_NAME);
		appGuardian.tell(new AppGuardian.MessageToGuardian("First Akka Java App"));

		try {
			System.out.println(">>> Press ENTER to exit <<<");
			System.in.read();
		}
		catch (IOException ignored) {
		}
		finally {
			appGuardian.terminate();
		}
	}

}

The expected outcome is to have our Guardian actor to print the message submitted. By pressing enter the Akka application will terminate through the guardian actor.
On the next blog we will go one step further and add a unit test that validates the message received.
As always you can find the source code on github.

Run a docker PostgreSQL instance locally for Testing

Running a PostgreSQL instance ad-hoc for testing is not always as bootstraping as it should be. This blog will run a PostgreSQL instance that connects to your workstation’s network and instead of using one of the popular tools like dbeaver we shall use the client that comes with the instance. Also we shall run a bootstrap script to have some data pre-inserted.

Let’s get started by running the instance. On purpose I will use another port. On scenarios of multiple instances running in your workstation, port collisions are likely. The workaround would be to choose port 5433.

docker run --rm --name test-instance -e POSTGRES_PASSWORD=password -p 5433:5432 postgres

This will run PostgreSQL and you shall be able to connect to port 5433. On a CTRL-C the instance will be stopped and destroyed.

Now instead of using an external tool to connect let’s use the instance itself, it comes with psql pre-installed.

docker exec -it test-instance /bin/bash
> psql postgres postgres
postgres=# \h
Available help:
  ABORT                            ALTER TRIGGER                    CREATE RULE                      DROP GROUP                       LISTEN
  ALTER AGGREGATE                  ALTER TYPE                       CREATE SCHEMA                    DROP INDEX                       LOAD
  ALTER COLLATION                  ALTER USER                       CREATE SEQUENCE                  DROP LANGUAGE                    LOCK
.....
postgres=# \q

The instance works and connections from the outside are possible.

Next step would be to bootstrap a db initialization script.

#!/bin/bash
set -e
 
psql -v ON_ERROR_STOP=1 --username postgres --dbname postgres <<-EOSQL
    create schema test_schema;
 
    create table test_schema.employee(
        id  SERIAL PRIMARY KEY,
        firstname   TEXT    NOT NULL,
        lastname    TEXT    NOT NULL,
        email       TEXT    not null,
        age         INT     NOT NULL,
        salary         real,
        unique(email)
    );
 
    insert into test_schema.employee (firstname,lastname,email,age,salary)
    values ('John','Doe 1','john1@doe.com',18,1234.23);

EOSQL

Supposing the file with the script is called init_db.sh

Let’s run the command with the initialization schema mounted.

docker run --rm --name test-instance -v /path/to/init_db.sh:/docker-entrypoint-initdb.d/init-db-script.sh -e POSTGRES_PASSWORD=password -p 5433:5432 postgres

And let’s check the results.

docker exec -it test-instance /bin/bash
>psql postgres postgres
postgres=# SELECT*FROM test_schema.employee;
 id | firstname | lastname |     email     | age | salary
----+-----------+----------+---------------+-----+---------
  1 | John      | Doe 1    | john1@doe.com |  18 | 1234.23
(1 row)

That’s it! You created a Postgresql database through docker, you did connect to it also you added a bootstrap script with data.

Locking for multiple nodes the easy way: GCS

It happens to all of us. We develop stateless applications that can scale horizontally without any effort.
However sometimes cases arise where you need to achieve some type of coordination.

You can go really advanced on this one. For example you can use a framework like Akka and it’s cluster capabilities. Or you can go really simple like rolling a mechanism on your own as long as it gives you the results needed. On another note you can just have different node groups based on the work you need them to do. The options and the solutions can change based on the problem.

If your problem can go with a simple option, one way to do so , provided you use Google Cloud Storage, is to use its lock capabilities.
Imagine for example a scenario of 4 nodes, they do scale dynamically but each time a new node registers you want to change its actions by acquiring a unique configuration which does not collide with a configuration another node might have received.

The strategy can be to use a file on Google Cloud Storage for locking and a file that acts as a centralised configuration registry.

The lock file is nothing more that a file on cloud storage which shall be created and deleted. What will give us lock abilities is the option on GCS to create a file only if it not exists.
Thus a process from one node will try to create the `lock` file, this action would be equivalent to obtaining the lock.
Once the process is done will delete the file, this action would be equivalent to releasing the lock.
Other processes in the meantime will try to create the file (acquire the lock) and fail (file already exists) because other processes have created the file.
Meanwhile the process that has successfully created the file (acquired the lock) will change the centralised configuration registry and once done will delete the file (release the lock).

So let’s start with the lock object.

package com.gkatzioura.gcs.lock;

import java.util.Optional;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;

public class GCSLock {

	public static final String LOCK_STRING = "_lock";
	private final Storage storage;
	private final String bucket;
	private final String keyName;

	private Optional<Blob> acquired = Optional.empty();

	GCSLock(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public boolean acquire() {
		try {
			var blobInfo = BlobInfo.newBuilder(bucket, keyName).build();
			var blob = storage.create(blobInfo, LOCK_STRING.getBytes(), Storage.BlobTargetOption.doesNotExist());
			acquired = Optional.of(blob);
			return true;
		} catch (StorageException storageException) {
			return false;
		}
	}

	public void release() {
		if(!acquired.isPresent()) {
			throw new IllegalStateException("Lock was never acquired");
		}
		storage.delete(acquired.get().getBlobId());
	}

}

As you can see the write specifies to write an object only if it does not exist. This operation behind the scenes is using the x-goog-if-generation-match header which is used for concurrency.
Thus one node will be able to acquire the lock and change the configuration files.
Afterwards it can delete the lock. If an exception is raised probably the operation fails and the lock is already acquired.

To make the example more complete let’s make the configuration file. The configuration file would be a simple json file for key map actions.

package com.gkatzioura.gcs.lock;

import java.util.HashMap;
import java.util.Map;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.json.JSONObject;

public class GCSConfiguration {

	private final Storage storage;
	private final String bucket;
	private final String keyName;

	GCSConfiguration(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public void addProperty(String key, String value) {
		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		final JSONObject configJson;

		if(blob==null) {
			configJson = new JSONObject();
		} else {
			configJson = new JSONObject(new String(blob.getContent()));
		}

		configJson.put(key, value);

		var blobInfo = BlobInfo.newBuilder(blobId).build();
		storage.create(blobInfo, configJson.toString().getBytes());
	}

	public Map<String,String> properties() {

		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		var map = new HashMap<String,String>();

		if(blob!=null) {
			var jsonObject = new JSONObject(new String(blob.getContent()));
			for(var key: jsonObject.keySet()) {
				map.put(key, jsonObject.getString(key));
			}
		}

		return map;
	}

}

It is a simple config util backed by GCS. Eventually it can be changed and put the lock operating inside the addProperty operation, it’s up to the user and the code. For the purpose of this blog we shall just acquire the lock change the configuration and release the lock.
Our main class will look like this.

package com.gkatzioura.gcs.lock;

import com.google.cloud.storage.StorageOptions;

public class Application {

	public static void main(String[] args) {
		var storage = StorageOptions.getDefaultInstance().getService();

		final String bucketName = "bucketName";
		final String lockFileName = "lockFileName";
		final String configFileName = "configFileName";

		var lock = new GCSLock(storage, bucketName, lockFileName);
		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

		var lockAcquired = lock.acquire();
		if(lockAcquired) {
			gcsConfig.addProperty("testProperty", "testValue");
			lock.release();
		}

		var config = gcsConfig.properties();

		for(var key: config.keySet()) {
			System.out.println("Key "+key+" value "+config.get(key));
		}

	}

}

Now let’s go for some multithreading. Ten threads will try to put values, it is expected that they have some failure.

package com.gkatzioura.gcs.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class ApplicationConcurrent {

	private static final String bucketName = "bucketName";
	private static final String lockFileName = "lockFileName";
	private static final String configFileName = "configFileName";

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		var storage = StorageOptions.getDefaultInstance().getService();

		final int threads = 10;
		var service = Executors.newFixedThreadPool(threads);
		var futures = new ArrayList<Future>(threads);

		for (var i = 0; i < threads; i++) {
			futures.add(service.submit(update(storage, "property-"+i, "value-"+i)));
		}

		for (var f : futures) {
			f.get();
		}

		service.shutdown();

		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);
		var properties = gcsConfig.properties();

		for(var i=0; i < threads; i++) { System.out.println(properties.get("property-"+i)); } } private static Runnable update(final Storage storage, String property, String value) { return () -> {
			var lock = new GCSLock(storage, bucketName, lockFileName);
			var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

			boolean lockAcquired = false;

			while (!lockAcquired) {
				lockAcquired = lock.acquire();
				System.out.println("Could not acquire lock");
			}

			gcsConfig.addProperty(property, value);
			lock.release();
		};
	}
}

Obviously 10 threads are ok to display the capabilities. During the thread initialization and execution some threads will try to acquire the lock simultaneously and one will fails, while other threads will be late and will fail and wait until the lock is available.

In the end what is expected is all of them to have their values added to the configuration.
That’s it. If your problems have a simple nature this approach might do the trick. Obviously you can use the http api instead of the sdk. You can find the code on github.