Apache Arrow on the JVM: Streaming Reads

Previously we wrote Arrow Data to a Stream. Now we shall read those data from a stream.

Just like on the previous blog the we shall implement the Closeable interface. This is needed to close the RootAllocator and free-up memory.

We shall pass a ReadableByteChannel and thus get the stream into read objects. 

package com.gkatzioura.arrow;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;

public class DefaultEntriesReader implements Closeable {

    private final RootAllocator rootAllocator;

    public DefaultEntriesReader() {
        rootAllocator = new RootAllocator(Integer.MAX_VALUE);
    }

    public List<DefaultArrowEntry> readBytes(ReadableByteChannel readableByteChannel) throws IOException {
        List<DefaultArrowEntry> defaultArrowEntries = new ArrayList<>();

        try(ArrowStreamReader arrowStreamReader = new ArrowStreamReader(readableByteChannel, rootAllocator)) {
            var root = arrowStreamReader.getVectorSchemaRoot();

            var childVector1 = (VarCharVector)root.getVector(0);
            var childVector2 = (IntVector)root.getVector(1);

            while (arrowStreamReader.loadNextBatch()) {

                int batchSize = root.getRowCount();

                for (int i = 0; i < batchSize; i++) {
                    var strData = new String(childVector1.get(i));
                    var intData = childVector2.get(i);

                    DefaultArrowEntry defaultArrowEntry = DefaultArrowEntry.builder().col1(strData).col2(intData).build();
                    defaultArrowEntries.add(defaultArrowEntry);
                }
            }

            return defaultArrowEntries;
        }
    }

    @Override
    public void close() throws IOException {
        rootAllocator.close();
    }
}

Let’s wrap it up with a write and a Read

package com.gkatzioura.arrow;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ArrowMain {

    public static void main(String[] args) throws IOException {
        var originalEntries = IntStream.rangeClosed(0, 11)
                             .boxed()
                             .map(i -> new DefaultArrowEntry("data-"+i, i)).collect(Collectors.toList());

        var outputStream = new ByteArrayOutputStream();

        try(var arrowWriter = new DefaultEntriesWriter()) {
            arrowWriter.write(originalEntries, 10, Channels.newChannel(outputStream));
        }

        byte[] introBytes = outputStream.toByteArray();

        var inputStream = new ByteArrayInputStream(introBytes);

        try(var arrowReader = new DefaultEntriesReader()) {
            var entries =arrowReader.readBytes(Channels.newChannel(inputStream));
            for (DefaultArrowEntry entry : entries) {
                System.out.println("Read "+entry.getCol1()+" "+entry.getCol2());
            }
        }

    }

}

That’s it. To summarise we created Arrow Schemas, we wrote data to a Stream and we read data from a Stream!

Apache Arrow on the JVM: Streaming Writes

Previously we went to create some schemas on Arrow.  On this blog we will have a look on writing through streaming API.

Based on the previous post’s Schema we shall create a DTO for our classes.

package com.gkatzioura.arrow;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class DefaultArrowEntry {

    private String col1;
    private Integer col2;

}

Our goal would be to transform those Java objects into a Stream of Arrow bytes.

The allocator creates DirectByteBuffer‘s.
Those buffers are off-heap. You do need to free up the memory used, but for the library user this is done by executing the close() operation on the allocator. In our case our class will implement the Closeable interface which shall do the allocator close operation.

By using the stream api, the data will be streamed to the OutPutStream submitted using the Arrow format.

package com.gkatzioura.arrow;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.util.Text;

import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA;

public class DefaultEntriesWriter implements Closeable {

    private final RootAllocator rootAllocator;
    private final VectorSchemaRoot vectorSchemaRoot;

    public DefaultEntriesWriter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

    public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) {
        if (batchSize <= 0) {
            batchSize = defaultArrowEntries.size();
        }

        DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();
        try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
            writer.start();

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

            boolean exactBatches = defaultArrowEntries.size()%batchSize == 0;
            int batchCounter = 0;

            for(int i=0; i < defaultArrowEntries.size(); i++) {
                childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2());

                batchCounter++;

                if(batchCounter == batchSize) {
                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();
                    batchCounter = 0;
                }
            }

            if(!exactBatches) {
                vectorSchemaRoot.setRowCount(batchCounter);
                writer.writeBatch();
            }

            writer.end();
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

}

To display the support of batches on Arrow a simple batch algorithm has been implemented within the function. For our example just take into account that data will be written in batches.

Let’s dive into the function.

The vector allocator discussed previously is created

    public DefaultEntriesToBytesConverter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

Then when writing to a stream, an arrow stream writer is implemented and started

ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out));
writer.start();

We shall use the vectors in order to populated them with the data. Also reset them but let the pre-alocated buffers to exist

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

We use the setSafe operation when writing data. This way if more buffer needs to be allocated shall be done. For this example it’s done on every write, but can be avoided when the operations and the buffer size needed is taken into account.

                childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

Then we write the batch to the stream.


                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();

Last but not least we close the writer.

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

The next blog will focus on reading Arrow Data from a stream.

Apache Arrow on the JVM: Get Started and Schemas

Arrow is memory format for flat and hierarchical data. It is a popular format used by various big data tools, among them BigQuery. One of the benefits that Arrow brings is that the format of the data has the same byte representation on the languages supported. So apart from the benefits of a columnar memory format there are also the benefits of zero-copy without the serialization overhead.

Apache Arrow defines a language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware like CPUs and GPUs. The Arrow memory format also supports zero-copy reads for lightning-fast data access without serialization overhead. more

Let’s import the libraries

        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-memory-netty</artifactId>
            <version>${arrow.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-vector</artifactId>
            <version>${arrow.version}</version>
        </dependency>

Before starting it is essential to understand that for Read/Write operations on Arrow, byte buffers are used. Operations like reading and writing is continuous exchange of bytes. To make this efficient Arrow comes with a buffer allocator, which can have a certain size or have an automatic expansion.
The libraries backing the allocation management is arrow-memory-netty and arrow-memory-unsafe. We shall use the netty one.

Storing Data in arrow requires a schema. Schemas can be defined programatically

package com.gkatzioura.arrow;

import java.io.IOException;
import java.util.List;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

public class SchemaFactory {

    public static Schema DEFAULT_SCHEMA = createDefault();

    public static Schema createDefault() {
        var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);
        var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);

        return new Schema(List.of(strField, intField));
    }

    public static Schema schemaWithChildren() {
        var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
        var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
        var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

        return new Schema(List.of(itemField));
    }

    public static Schema fromJson(String jsonString) {
        try {
            return Schema.fromJSON(jsonString);
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

}

Also they have a parseable json representation.

{
  "fields" : [ {
    "name" : "col1",
    "nullable" : true,
    "type" : {
      "name" : "utf8"
    },
    "children" : [ ]
  }, {
    "name" : "col2",
    "nullable" : true,
    "type" : {
      "name" : "int",
      "bitWidth" : 32,
      "isSigned" : true
    },
    "children" : [ ]
  } ]
}

Plus just like Avro you can have complex schemas and embedded values on a field.

    public static Schema schemaWithChildren() {
        var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
        var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
        var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

        return new Schema(List.of(itemField));
    }

On the next blog, we shall use the Streaming API for Arrow

Testing with Hoverfly and Java Part 6: JSON and JsonPath matchers

Previously we used the XML and Xpath Hoverfly matchers.
On this blog we shall focus on rules that assist us with the data exchanged using Json.

The default Json matcher will compare the Json submitted with the Json expected. This means that the submitted Json shall be validated for all the elements and their value. New lines or any extra spaces as long as they don’t change the information that the JSON carries, will not prevent the request from being a success.

Let’s put our initial configuration that will make the Json match.

 


    @BeforeEach
    void setUp() {
        var simulation = SimulationSource.dsl(service("http://localhost:8085")
                .post("/json")
                .body(RequestFieldMatcher.newJsonMatcher("{\"document\":\"document-a\"}"))
                .willReturn(success(SUCCESS_RESPONSE, "application/json"))
                .post("/json/partial")
                .body(RequestFieldMatcher.newJsonPartialMatcher("{\"document\":\"document-a\"}"))
                .willReturn(success(SUCCESS_RESPONSE, "application/json"))
                .post("/jsonpath")
                .body(RequestFieldMatcher.newJsonPathMatch("$.document[1].description"))
                .willReturn(success(SUCCESS_RESPONSE, "application/json"))
        );

        var localConfig = HoverflyConfig.localConfigs().disableTlsVerification().asWebServer().proxyPort(8085);
        hoverfly = new Hoverfly(localConfig, SIMULATE);
        hoverfly.start();
        hoverfly.simulate(simulation);
    }

    @AfterEach
    void tearDown() {
        hoverfly.close();
    }

    

In our first example we will try to match the Json of our request with the Json expected.

    @Test
    void testJsonExactMatch() {
        var client = HttpClient.newHttpClient();

        var exactRequest = HttpRequest.newBuilder()
                .uri(URI.create("http://localhost:8085/json"))
                .POST(HttpRequest.BodyPublishers.ofString("   {\"document\":    \"document-a\"}"))
                .build();

        var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
                .thenApply(HttpResponse::body)
                .join();

        Assertions.assertEquals(SUCCESS_RESPONSE, exactResponse);
    }

Also let’s make sure there is going to be a failure on an extra element.

    @Test
    void testJsonNoMatch() {
        var client = HttpClient.newHttpClient();

        var exactRequest = HttpRequest.newBuilder()
                .uri(URI.create("http://localhost:8085/json"))
                .POST(HttpRequest.BodyPublishers.ofString("{\"doc2\":\"value\", \"document\":\"document-a\"}"))
                .build();

        var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
                .join();

        Assertions.assertEquals(502, exactResponse.statusCode());
    }

Now let’s see the non exact matcher.

    @Test
    void testJsonPartialMatch() {
        var client = HttpClient.newHttpClient();

        var exactRequest = HttpRequest.newBuilder()
                .uri(URI.create("http://localhost:8085/json/partial"))
                .POST(HttpRequest.BodyPublishers.ofString("{\"doc2\":\"value\", \"document\":\"document-a\"}"))
                .build();

        var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
                .thenApply(HttpResponse::body)
                .join();

        Assertions.assertEquals(SUCCESS_RESPONSE, exactResponse);
    }

So far we checked matching the whole payload. Let’s try the Jsonpath approach. The example below does match.

    @Test
    void testJsonPathMatch() {
        var client = HttpClient.newHttpClient();

        var exactRequest = HttpRequest.newBuilder()
                .uri(URI.create("http://localhost:8085/jsonpath"))
                .POST(HttpRequest.BodyPublishers.ofString("{\"document\":[{\"description\":\"description-1\"},{\"description\":\"description-2\"}]}"))
                .build();

        var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
                .thenApply(HttpResponse::body)
                .join();

        Assertions.assertEquals(SUCCESS_RESPONSE, exactResponse);
    }

But the example below won’t match


    @Test
    void testJsonPathNoMatch() {
        var client = HttpClient.newHttpClient();

        var exactRequest = HttpRequest.newBuilder()
                .uri(URI.create("http://localhost:8085/jsonpath"))
                .POST(HttpRequest.BodyPublishers.ofString("{\"document\":[{\"description\":\"description-1\"}]}"))
                .build();

        var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
                .join();

        Assertions.assertEquals(502, exactResponse.statusCode());
    }

That’s it we did use the Json and JsonPath matchers for the Json based data!

Testing with Hoverfly and Java Part 5: XML and Xpath matchers

Previously we worked with some of the existing Hoverfly matchers like the regex, glob and exact.
Each one serves its purpose but we might want some rules that assist us with the format of the data exchanged through our requests.

On this blog we will focus on the matchers for xml.

 

The default xml matcher will compare the xml submitted with the xml expected. This means that the submitted xml shall be validated node by node value by value. New lines or any extra spaces as long as they don’t change the content that the xml carries will not prevent the request from being a success.

Let’s put our initial configuration that will make the xml match.

	public static final String SUCCESS_RESPONSE = "<response>"
			+ "<result>success</result>"
			+ "</response>";

	private Hoverfly hoverfly;

	@BeforeEach
	void setUp() {
		var simulation = SimulationSource.dsl(service("http://localhost:8085")
				.post("/xml")
				.body(RequestFieldMatcher.newXmlMatcher("<document type=\"xml\">"
						+ "xml-request"
						+ "</document>"))
				.willReturn(success(SUCCESS_RESPONSE, "application/xml")));

		var localConfig = HoverflyConfig.localConfigs().disableTlsVerification().asWebServer().proxyPort(8085);
		hoverfly = new Hoverfly(localConfig, SIMULATE);
		hoverfly.start();
		hoverfly.simulate(simulation);
	}

	@AfterEach
	void tearDown() {
		hoverfly.close();
	}

So in our first example we will try to match the xml of our request with the xml expected.

	@Test
	void testXmlExactMatch() {
		var client = HttpClient.newHttpClient();

		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/xml"))
				.POST(HttpRequest.BodyPublishers.ofString("  <document type=\"xml\">\n\n"
						+ "xml-request"
						+ "</document>\t"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();

		Assertions.assertEquals(SUCCESS_RESPONSE, exactResponse);
	}

As you see regardless of the new lines and the tabs, our request will be successful since the xml data do match.

Now let’s try to add a node to the xml.

	@Test
	void testXmlNoMatch() {
		var client = HttpClient.newHttpClient();

		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/xml"))
				.POST(HttpRequest.BodyPublishers.ofString("  <document type=\"xml\">\n\n"
						+ "xml-request"
						+ "</document>\t<empty-node>ok</empty-node>"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.join();

		Assertions.assertEquals(502, exactResponse.statusCode());
	}

The xml does not match thus it will fail.

Let’s focus to another problem. Since the data exchanged are dynamic, chances are that exact matches might not be possible. Also you might not need to focus on all the information submitted but just a specific section of the information exchanged. Therefore an XPath matcher becomes handy.

Will enhance the initial setup with an XPath rule.


	@BeforeEach
	void setUp() {
		var simulation = SimulationSource.dsl(service("http://localhost:8085")
				.post("/xml")
				.body(RequestFieldMatcher.newXmlMatcher("<document type=\"xml\">"
						+ "xml-request"
						+ "</document>"))
				.willReturn(success(SUCCESS_RESPONSE, "application/xml"))
				.post("/xpath")
				.body(RequestFieldMatcher.newXpathMatcher("/document/payment[amount=1]"))
				.willReturn(success(SUCCESS_RESPONSE, "application/xml"))
		);

		var localConfig = HoverflyConfig.localConfigs().disableTlsVerification().asWebServer().proxyPort(8085);
		hoverfly = new Hoverfly(localConfig, SIMULATE);
		hoverfly.start();
		hoverfly.simulate(simulation);
	}

If there is a document node with a payment node and the value on the amount node is 1 there will be a match
Let’s go for a positive scenario

	@Test
	void testXpathMatch() {
		var client = HttpClient.newHttpClient();

		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/xpath"))
				.POST(HttpRequest.BodyPublishers.ofString("  <document type=\"xml\">\n\n"
						+ "<payment><amount>142</amount></payment>"
						+ "<payment><amount>1</amount><currency>GBP</currency></payment>"
						+ "<payment>invalid</payment>"
						+ "</document>\t"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();

		Assertions.assertEquals(SUCCESS_RESPONSE, exactResponse);
	}

As expected we got a match.
Let’s go for a negative scenario.

	@Test
	void testXpathNoMatch() {
		var client = HttpClient.newHttpClient();

		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/xpath"))
				.POST(HttpRequest.BodyPublishers.ofString("  <document type=\"xml\">\n\n"
						+ "<payment><amount>142</amount></payment>"
						+ "<payment><amount>no-match</amount><currency>GBP</currency></payment>"
						+ "<payment>invalid</payment>"
						+ "</document>\t"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.join();

		Assertions.assertEquals(502, exactResponse.statusCode());
	}

That’s it we did use the xml and xpath matchers for the xml based data. The next blog shall focus on the JSON based matchers.

Testing using TestContainers

Part of our everyday ci/cd tasks involve using containers in order for the tests to take effect.
So what if you could control the containers you use through your tests and serve your scenarios better.
Also what if you could do this in a more managed way?

Testcontainers is a Java library that supports JUnit tests, providing lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container.

You pretty much can guess what is all about. Our tests can spin up the containers with the parameters needed. We will get started by using it in our tests with Junit.

It all starts with the right dependencies. Supposing we use maven for this tutorial.

	<properties>
		<junit-jupiter.version>5.4.2</junit-jupiter.version>
		<testcontainers.version>1.15.0</testcontainers.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>testcontainers</artifactId>
			<version>${testcontainers.version}</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>junit-jupiter</artifactId>
			<version>${testcontainers.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

I shall use an example we already have with Hoverfly.
We can use Hoverfly on our tests either by running it using Java or having a Hoverfly container with the test cases preloaded.
On the previous blog Hoverfly was integrated in our tests through the Java binary.
For this blog we shall use the Hoverfly container.

Our end result will look like this.

package com.gkatzioura.hoverfly.docker;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public class ContainerBasedSimulation {

	private static final String SIMULATION_HOST_PATH = ContainerBasedSimulation.class.getClassLoader().getResource("simulation.json").getPath();

	@Container
	public static GenericContainer gcs = new GenericContainer("spectolabs/hoverfly")
			.withExposedPorts(8888)
			.withExposedPorts(8500)
			.withCommand("-webserver","-import","/var/hoverfly/simulation.json")
			.withClasspathResourceMapping("simulation.json","/var/hoverfly/simulation.json" ,BindMode.READ_ONLY);


	@Test
	void testHttpGet() {
		var hoverFlyHost = gcs.getHost();
		var hoverFlyPort = gcs.getMappedPort(8500);
		var client = HttpClient.newHttpClient();
		var request = HttpRequest.newBuilder()
				.uri(URI.create("http://"+hoverFlyHost+":"+ hoverFlyPort +"/user"))
				.build();
		var res = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();
		Assertions.assertEquals("{\"username\":\"test-user\"}",res);
	}

}

Let’s break it down.

The @Testcontainers annotation is needed for the Jupiter integration.

@Testcontainers
public class ContainerBasedSimulation {
}

We shall use a container image that is not preloaded among the test containers available (for example Elastic Search), thus we shall use the GenericContainer class.

@Container
public static GenericContainer gcs = new GenericContainer("spectolabs/hoverfly")

Since we want to load to the container a simulation, we need to set the path to our simulation from our host machine. By using withClasspathResourceMapping we directly specify files in our classpath, for example the test resources.

			.withClasspathResourceMapping("simulation.json","/var/hoverfly/simulation.json",BindMode.READ_ONLY);

Hoverfly needs the simulation and the admin port to be exposed so we shall instruct Testcontainers to expose those ports and map them to host.

new GenericContainer("spectolabs/hoverfly")
			.withExposedPorts(8888)
			.withExposedPorts(8500)

We need to have a simulation placed on the container. By using withFileSystemBind we specify the local path and the path on the container.

...
.withFileSystemBind(SIMULATION_HOST_PATH,"/var/hoverfly/simulation.json" ,BindMode.READ_ONLY)
...

Also docker images might need to have some extra commands, therefore we shall use .withCommand, to pass the commands needed.

...
.withCommand("-webserver","-import","/var/hoverfly/simulation.json")
...

Technically we can say we are ready to go and connect to the container, however when running test containers the container is not accessible through the port specified to do the binding. After all, if tests run on parallel there is going to be a collision. So what Testcontainers do is to map the exposed port of the container to a random local port.
This way port collisions are avoided.

	@Test
	void testHttpGet() {
		var hoverFlyHost = gcs.getHost();
		var hoverFlyPort = gcs.getMappedPort(8500);
		var client = HttpClient.newHttpClient();
		var request = HttpRequest.newBuilder()
				.uri(URI.create("http://"+hoverFlyHost+":"+ hoverFlyPort +"/user"))
				.build();
		var res = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();
		Assertions.assertEquals("{\"username\":\"test-user\"}",res);
	}

Using GenericContainer.getMappedPort(8500) we can get the port we have to use to interact with the container. Also getHost() is essential too since it won’t always direct to localhost.

Last but not least while testing if your are curious enough and do a docker ps.

docker ps 
>04a322447226        testcontainers/ryuk:0.3.0   "/app"                   3 seconds ago       Up 2 seconds        0.0.0.0:32814->8080/tcp    testcontainers-ryuk-fb60c3c6-5f31-4f4e-9ab7-ce25a00eeccc

You shall see a container running which is not the one we instructed through our unit test. The ryuk container is responsible for removing containers/networks/volumes/images by given filter after specified delay.

That’s it! We just achieved running the container we needed through our a test and we successfully migrated a previous test to one using test containers.

Static Factories are Great!

Every now and then I jump on classes with multiple constructors or classes that are rigorous to work with. Let alone not being able to mock part of their components and at the end being forced to use reflection for testing (mockito based, old school, you choose).

Imagine a Producer class that you use for Kafka. A class that provides you with some abstraction on sending messages.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class StrMessageProducer {

	private Producer<String,String> producer;
	private String topic = "test-topic";
	
	StrMessageProducer() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getProperty("bootstrap.servers"));
		kafkaProperties.put("key.serializer",System.getProperty("key.serializer"));
		kafkaProperties.put("value.serialize",System.getProperty("value.serializer"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		this.producer = kafkaProducer;
	}

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();
			log.info("Submitted {}",metadata.offset());
		}
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);
		}
	}
}

Apart from being an ugly class it is also very hard to change some of its components.

For example

  • I cannot use this class to post to another topic
  • I cannot use this class to use a different server configuration apart from the one on the properties
  • It’s difficult to test the functionality of the class since crucial components are created through the constructor

It’s obvious that the constructor in this case serves the purpose on creating a Kafka producer based on the system properties. But the responsibility of the class is to use that producer in order send messages in a specific way. Thus I will move the creation of the Producer from the constructor. Also because we might want to swap the topic used, I will also inject the topic instead of having it hardcoded.
By doing so we encourage dependency injection. We make it easy to swap the ingredients of the class however the execution would be the same.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class StrMessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;
	}

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();
			log.info("Submitted {}",metadata.offset());
		}
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);
		}
	}
}

But we still need the producer to be created somehow. This is where the factory pattern kicks in.

We shall add static factories in order to have instances of the StrMessageProducer class with different configurations.
Let’s add two factory methods
The first factory method would be based on system properties and the second on environment variables.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class StrMessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;
	}

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();
			log.info("Submitted {}",metadata.offset());
		}
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);
		}
	}

	public static StrMessageProducer createFromSystemPros() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getProperty("bootstrap.servers"));
		kafkaProperties.put("key.serializer",System.getProperty("key.serializer"));
		kafkaProperties.put("value.serialize",System.getProperty("value.serializer"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new MessageProducer(kafkaProducer, System.getProperty("main.topic"));
	}

	public static StrMessageProducer createFromEnv() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getenv("BOOTSTRAP_SERVERS"));
		kafkaProperties.put("key.serializer",System.getenv("KEY_SERIALIZER"));
		kafkaProperties.put("value.serialize",System.getenv("VALUE_SERIALIZER"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new MessageProducer(kafkaProducer, System.getProperty("MAIN_TOPIC"));
	}
}

You already see the benefits. You have a clean class ready to use as it is and you have some factory methods for convenience. Eventually you can add more static factories, some of them might also have arguments, for example the topic.

Also we can go one step further when we want to have multiple classes of MessageProducers and we want to utilise an interface. So we are going to introduce the MessageProducer interface which our StrMessageProducer class will implement. Also we are going to put the static factories to the interface.

So this will be our interface with the static factories.

package com.gkatzioura.kafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

public interface MessageProducer {
	
	void send(String message);

	static MessageProducer createFromSystemPros() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getProperty("bootstrap.servers"));
		kafkaProperties.put("key.serializer",System.getProperty("key.serializer"));
		kafkaProperties.put("value.serialize",System.getProperty("value.serializer"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new StrMessageProducer(kafkaProducer, System.getProperty("main.topic"));
	}

	static MessageProducer createFromEnv() {
		var kafkaProperties = new Properties();
		kafkaProperties.put("bootstrap.servers",System.getenv("BOOTSTRAP_SERVERS"));
		kafkaProperties.put("key.serializer",System.getenv("KEY_SERIALIZER"));
		kafkaProperties.put("value.serialize",System.getenv("VALUE_SERIALIZER"));
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new StrMessageProducer(kafkaProducer, System.getProperty("MAIN_TOPIC"));
	}

}

And this would be our new StrMessageProducer class.

package com.gkatzioura.kafka.producer;

import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class StrMessageProducer implements MessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;
	}

	@Override
	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();
			log.info("Submitted {}",metadata.offset());
		}
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);
		}
	}

}

Let’s check the benefits

  • We can have various implementations of a MessageProducer class
  • We can add as many factories we want that serve our purpose
  • We can easily test the MessageProducer implementation by passing mocks to the constructors
  • We keep our codebase cleaner

 

Kafka & Zookeeper for Development: Connecting Clients to the Cluster

Previously we achieved to have our Kafka brokers connect to a ZooKeeper ensemble. Also we brought down some brokers checked the election leadership and produced/consumed some messages.

For now we want to make sure that we will be able to connect to those nodes. The problem with connecting to the ensemble we created previously is that it is located inside the container network. When a client interacts with one of the brokers and receives the full list of the brokers he will receive a list of IPs not accessible to it.

So the initial handshake of a client will be successful but then the client will try to interact withs some unreachable hosts.

In order to tackle this we will have a combination of workarounds.

The first one would be to bind the port of each Kafka broker to a different local ip.

kafka-1 will be mapped to 127.0.0.1:9092
kafka-2 will be mapped to 127.0.0.2:9092
kafka-3 will be mapped to 127.0.0.3:9092

So let’s create the aliases of those addresses

sudo ifconfig lo0 alias 127.0.0.2
sudo ifconfig lo0 alias 127.0.0.3

Now it’s possible to do the ip binding. Let’s also put those entries to our /etc/hosts. By doing this, we achieve our local network and our docker network to be in agreement on which broker they should access.

127.0.0.1	kafka-1
127.0.0.2	kafka-2
127.0.0.3	kafka-3

The next step is also to change the KAFKA_ADVERTISED_LISTENERS on each broker. We will adapt this to the DNS entry of each broker. By setting KAFKA_ADVERTISED_LISTENERS the clients from the outside can correctly connect to it, to an address reachable to them and not an address through the internal network. Further explanations can be found on this blog.

  kafka-1:
    container_name: kafka-1
    image: confluent/kafka
    ports:
    - "127.0.0.1:9092:9092"
    volumes:
    - type: bind
      source: ./server1.properties
      target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:9092"
  kafka-2:
    container_name: kafka-2
    image: confluent/kafka
    ports:
      - "127.0.0.2:9092:9092"
    volumes:
      - type: bind
        source: ./server2.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-2:9092"
  kafka-3:
    container_name: kafka-3
    image: confluent/kafka
    ports:
      - "127.0.0.3:9092:9092"
    volumes:
      - type: bind
        source: ./server3.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-3:9092"

We see the port binding change as well as the KAFKA_ADVERTISED_LISTENERS. Now let’s wrap everything together in our docker-compose

version: "3.8"
services:
  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: "1"
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-2:
    container_name: zookeeper-2
    image: zookeeper
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: "2"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-3:
    container_name: zookeeper-3
    image: zookeeper
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: "3"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  kafka-1:
    container_name: kafka-1
    image: confluent/kafka
    ports:
    - "127.0.0.1:9092:9092"
    volumes:
    - type: bind
      source: ./server1.properties
      target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:9092"
  kafka-2:
    container_name: kafka-2
    image: confluent/kafka
    ports:
      - "127.0.0.2:9092:9092"
    volumes:
      - type: bind
        source: ./server2.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-2:9092"
  kafka-3:
    container_name: kafka-3
    image: confluent/kafka
    ports:
      - "127.0.0.3:9092:9092"
    volumes:
      - type: bind
        source: ./server3.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-3:9092"

Last but not least you can find the code on github.

Testing with Hoverfly and Java Part 4: Exact, Glob and Regex Matchers

Previously we used Hoverfly among its state feature.
So far our examples have been close to an absolute request match, thus on this blog we will focus on utilising the matchers.
Having a good range of matchers is very important because most API interactions are dynamic and you can’t always predict the example. Imagine a JWT signature. You can match the body but the signature might change per environment.

 

There are three type of matchers available.

  • The exact matcher: The fields headers should be an exact match
  • The glob matcher: A match that gives the ability to using the `*`
  • The regex matcher: A matcher that requires you to search again on the internet on how to make a regex
  • XML matcher: This about matching the xml as XML node by node value by value
  • Xpath matcher: Match based on a value match through Xpath
  • JSON matcher: Match the json exactly
  • JSON partial matcher: Match if the json submitted contains the Json values specified
  • JSONPath matcher: Just like the xpath match based the on the json path submitted

Let’s start with the exact matcher.

public class ExactMatcherTests {

	private Hoverfly hoverfly;

	@BeforeEach
	void setUp() {
		var simulation = SimulationSource.dsl(service("http://localhost:8085")
				.get("/exact")
				.header("Origin", RequestFieldMatcher.newExactMatcher("internal-server"))
				.willReturn(success("{\"exact\":true}", "application/json")));

		var localConfig = HoverflyConfig.localConfigs().disableTlsVerification().asWebServer().proxyPort(8085);
		hoverfly = new Hoverfly(localConfig, SIMULATE);
		hoverfly.start();
		hoverfly.simulate(simulation);
	}

	@AfterEach
	void tearDown() {
		hoverfly.close();
	}

	@Test
	void testExactMatcherSuccess() {
		var client = HttpClient.newHttpClient();
		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/exact"))
				.header("Origin","internal-server")
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();

		Assertions.assertEquals("{\"exact\":true}", exactResponse);
	}

	@Test
	void testExactMatcherFailure() {
		var client = HttpClient.newHttpClient();
		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/exact"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.join();

		Assertions.assertEquals(502, exactResponse.statusCode());
	}

}

The failures or success come based on wether the header was matching exactly or not.

We shall use the glob match for a request parameter.


public class GlobMatcher {

	private Hoverfly hoverfly;

	@BeforeEach
	void setUp() {
		var simulation = SimulationSource.dsl(service("http://localhost:8085")
				.get("/glob")
				.queryParam("userName", RequestFieldMatcher.newGlobMatcher("john*"))
				.willReturn(success("{\"glob\":true}", "application/json")));

		var localConfig = HoverflyConfig.localConfigs().disableTlsVerification().asWebServer().proxyPort(8085);
		hoverfly = new Hoverfly(localConfig, SIMULATE);
		hoverfly.start();
		hoverfly.simulate(simulation);
	}

	@AfterEach
	void tearDown() {
		hoverfly.close();
	}

	@Test
	void testGlobMatcherSuccess() {
		var client = HttpClient.newHttpClient();
		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/glob?userName=johnDoe"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();

		Assertions.assertEquals("{\"glob\":true}", exactResponse);
	}

	@Test
	void testGlobMatcherFailure() {
		var client = HttpClient.newHttpClient();
		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/glob?userName=nojohnDoe"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.join();

		Assertions.assertEquals(502, exactResponse.statusCode());
	}

}

Last let’s head for the regex matcher. The regex matcher will just check for a capital letter: ([A-Z])\w+

public class RegexMatcherTests {

	private Hoverfly hoverfly;

	@BeforeEach
	void setUp() {
		var simulation = SimulationSource.dsl(service("http://localhost:8085")
				.post("/regex")
				.body(RequestFieldMatcher.newRegexMatcher("([A-Z])\\w+"))
				.willReturn(success("{\"regex\":true}", "application/json")));

		var localConfig = HoverflyConfig.localConfigs().disableTlsVerification().asWebServer().proxyPort(8085);
		hoverfly = new Hoverfly(localConfig, SIMULATE);
		hoverfly.start();
		hoverfly.simulate(simulation);
	}

	@AfterEach
	void tearDown() {
		hoverfly.close();
	}

	@Test
	void testRegexMatcherSuccess() {
		var client = HttpClient.newHttpClient();
		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/regex"))
				.POST(HttpRequest.BodyPublishers.ofString("Contains capital letter"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();

		Assertions.assertEquals("{\"regex\":true}", exactResponse);
	}

	@Test
	void testRegexMatcherFailure() {
		var client = HttpClient.newHttpClient();
		var exactRequest = HttpRequest.newBuilder()
				.uri(URI.create("http://localhost:8085/regex"))
				.POST(HttpRequest.BodyPublishers.ofString("won't match due to capital letter missing"))
				.build();

		var exactResponse = client.sendAsync(exactRequest, HttpResponse.BodyHandlers.ofString())
				.join();

		Assertions.assertEquals(502, exactResponse.statusCode());
	}

}

That’s it we did use the basic matchers for exact, glob, and regex based. The next blog shall focus on the xml based matchers.

Kafka & Zookeeper for Development: Connecting Brokers to the Ensemble

Previously we created successfully a Zookeeper ensemble, now it’s time to add some Kafka brokers that will connect to the ensemble and we shall execute some commands.

We will pick up from the same docker compose file we compiled previously. First let’s jump on the configuration that a Kafka broker needs.

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
group.initial.rebalance.delay.ms=0
socket.send.buffer.bytes=102400
delete.topic.enable=true
socket.request.max.bytes=104857600
log.cleaner.enable=true
log.retention.check.interval.ms=300000
log.retention.hours=168
num.io.threads=8
broker.id=0
log4j.opts=-Dlog4j.configuration=file:/etc/kafka/log4j.properties
log.dirs=/var/lib/kafka
auto.create.topics.enable=true
num.network.threads=3
socket.receive.buffer.bytes=102400
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=1
num.partitions=1
zookeeper.connection.timeout.ms=6000
zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181

Will go through the ones that is essential to know.

  • offsets.topic.replication.factor: how the internal offset topic gets replicated – replication factor
  • transaction.state.log.replication.factor: how the internal transaction topic gets replicated – replication factor
  • transaction.state.log.min.isr: the minimum in sync replicas for the internal transaction topic
  • delete.topic.enable: if not true Kafka will ignore the delete topic command
  • socket.request.max.bytes: the maximum size of requests
  • log.retention.check.interval.ms: the interval to evaluate if a log should be deleted
  • log.retention.hours: how many hours a log is retained before getting deleted
  • broker.id: what is the broker id of that installation
  • log.dirs: the directories where Kafka will store the log data, can be a comma separated
  • auto.create.topics.enable: create topics if they don’t exist on sending/consuming messages or asking for topic metadata
  • num.network.threads: threads on receiving requests and sending responses from the network
  • socket.receive.buffer.bytes: buffer of the server socket
  • log.segment.bytes: the size of a log file
  • num.recovery.threads.per.data.dir: threads used for log recovery at startup and flushing at shutdown
  • num.partitions: has to do with the default number of partition a topic will have once created if partition number is not specified.
  • zookeeper.connection.timeout.ms: time needed for a client to establish connection to ZooKeeper
  • zookeeper.connect: is the list of the ZooKeeper servers

Now it’s time to create the properties for each broker. Due to the broker.id property we need to have different files with the corresponding broker.id

So our first’s brokers file would look like this (broker.id 1). Keep in mind that those brokers will run on the same docker-compose file. Therefore the zookeeper.connect property contains the internal docker compose dns names. The name of the file would be named server1.properties.

socket.send.buffer.bytes=102400
delete.topic.enable=true
socket.request.max.bytes=104857600
log.cleaner.enable=true
log.retention.check.interval.ms=300000
log.retention.hours=168
num.io.threads=8
broker.id=1
transaction.state.log.replication.factor=1
log4j.opts=-Dlog4j.configuration\=file\:/etc/kafka/log4j.properties
group.initial.rebalance.delay.ms=0
log.dirs=/var/lib/kafka
auto.create.topics.enable=true
offsets.topic.replication.factor=1
num.network.threads=3
socket.receive.buffer.bytes=102400
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=1
num.partitions=1
transaction.state.log.min.isr=1
zookeeper.connection.timeout.ms=6000
zookeeper.connect=zookeeper-1\:2181,zookeeper-2\:2181,zookeeper-3\:2181

The same recipe applies for the broker.id=2 as well as broker.id=3

After creating those three broker configuration files it is time to change our docker-compose configuration.

version: "3.8"
services:
  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: "1"
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-2:
    container_name: zookeeper-2
    image: zookeeper
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: "2"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-3:
    container_name: zookeeper-3
    image: zookeeper
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: "3"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  kafka-1:
    container_name: kafka-1
    image: confluent/kafka
    ports:
    - "9092:9092"
    volumes:
    - type: bind
      source: ./server1.properties
      target: /etc/kafka/server.properties
  kafka-2:
    container_name: kafka-2
    image: confluent/kafka
    ports:
      - "9093:9092"
    volumes:
      - type: bind
        source: ./server2.properties
        target: /etc/kafka/server.properties
  kafka-3:
    container_name: kafka-3
    image: confluent/kafka
    ports:
      - "9094:9092"
    volumes:
      - type: bind
        source: ./server3.properties
        target: /etc/kafka/server.properties

Let’s spin up the docker-compose file.

> docker-compose -f docker-compose.yaml up

Just like the previous examples we shall run some commands through the containers.

Now that we have a proper cluster with Zookeeper and multiple Kafka brokers it is time to test them working together.
The first action is to create a topic with a replication factor of 3. The expected outcome would be for this topic to be replicated 3 kafka brokers

> docker exec -it kafka-1 /bin/bash
confluent@92a6d381d0db:/$ kafka-topics --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 --create --topic tutorial-topic --replication-factor 3 --partitions 1

Our topic has been created let’s check the description of the topic.

confluent@92a6d381d0db:/$ kafka-topics --describe --topic tutorial-topic --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
Topic:tutorial-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: tutorial-topic	Partition: 0	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3

As we see the Leader for the partition is broker 2

Next step is putting some data to the topic recently created. Before doing so I will add a consumer listening for messages to that topic. While we post messages to the topic those will be printed by this consumer.

> docker exec -it kafka-3 /bin/bash
confluent@4042774f8802:/$ kafka-console-consumer --topic tutorial-topic --from-beginning --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181

Let’s add some topic data.

> docker exec -it kafka-1 /bin/bash
confluent@92a6d381d0db:/$ kafka-console-producer --topic tutorial-topic --broker-list kafka-1:9092,kafka-2:9092
test1
test2
test3

As expected the consumer on the other terminal will print the messages expected.

test1
test2
test3

Due to having a cluster it would be nice to stop the leader broker and see another broker to take the leadership. While doing this the expected results will be to have all the messages replicated and no disruption on consuming and publishing the messages.

Stop the leader which is broker-2

> docker stop kafka-2

Check the leadership from another broker

confluent@92a6d381d0db:/$ kafka-topics --describe --topic tutorial-topic --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
Topic:tutorial-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: tutorial-topic	Partition: 0	Leader: 1	Replicas: 2,1,3	Isr: 1,3

The leader now is kafka-1

Read the messages to see that they did got replicated.

> docker exec -it kafka-3 /bin/bash
confluent@4042774f8802:/$ kafka-console-consumer --topic tutorial-topic --from-beginning --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
test1
test2
test3

As expected apart from the Leadership being in place our data have also been replicated!

If we try to post new messages, it will also be a successful action.

So to summarise we did run a Kafka cluster connected to a zookeeper ensemble. We did create a topic with replication enabled to 3 brokers and last but not least we did test what happens if one broker goes down.

On the next blog we are going to wrap it up so our local machine clients can connect to the docker compose ensemble.