Kafka & Zookeeper for Development: Connecting Clients to the Cluster

Previously we achieved to have our Kafka brokers connect to a ZooKeeper ensemble. Also we brought down some brokers checked the election leadership and produced/consumed some messages.

For now we want to make sure that we will be able to connect to those nodes. The problem with connecting to the ensemble we created previously is that it is located inside the container network. When a client interacts with one of the brokers and receives the full list of the brokers he will receive a list of IPs not accessible to it.

So the initial handshake of a client will be successful but then the client will try to interact withs some unreachable hosts.

In order to tackle this we will have a combination of workarounds.

The first one would be to bind the port of each Kafka broker to a different local ip.

kafka-1 will be mapped to 127.0.0.1:9092
kafka-2 will be mapped to 127.0.0.2:9092
kafka-3 will be mapped to 127.0.0.3:9092

So let’s create the aliases of those addresses

sudo ifconfig lo0 alias 127.0.0.2
sudo ifconfig lo0 alias 127.0.0.3

Now it’s possible to do the ip binding. Let’s also put those entries to our /etc/hosts. By doing this, we achieve our local network and our docker network to be in agreement on which broker they should access.

127.0.0.1	kafka-1
127.0.0.2	kafka-2
127.0.0.3	kafka-3

The next step is also to change the KAFKA_ADVERTISED_LISTENERS on each broker. We will adapt this to the DNS entry of each broker. By setting KAFKA_ADVERTISED_LISTENERS the clients from the outside can correctly connect to it, to an address reachable to them and not an address through the internal network. Further explanations can be found on this blog.

  kafka-1:
    container_name: kafka-1
    image: confluent/kafka
    ports:
    - "127.0.0.1:9092:9092"
    volumes:
    - type: bind
      source: ./server1.properties
      target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:9092"
  kafka-2:
    container_name: kafka-2
    image: confluent/kafka
    ports:
      - "127.0.0.2:9092:9092"
    volumes:
      - type: bind
        source: ./server2.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-2:9092"
  kafka-3:
    container_name: kafka-3
    image: confluent/kafka
    ports:
      - "127.0.0.3:9092:9092"
    volumes:
      - type: bind
        source: ./server3.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-3:9092"

We see the port binding change as well as the KAFKA_ADVERTISED_LISTENERS. Now let’s wrap everything together in our docker-compose

version: "3.8"
services:
  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: "1"
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-2:
    container_name: zookeeper-2
    image: zookeeper
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: "2"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-3:
    container_name: zookeeper-3
    image: zookeeper
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: "3"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  kafka-1:
    container_name: kafka-1
    image: confluent/kafka
    ports:
    - "127.0.0.1:9092:9092"
    volumes:
    - type: bind
      source: ./server1.properties
      target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:9092"
  kafka-2:
    container_name: kafka-2
    image: confluent/kafka
    ports:
      - "127.0.0.2:9092:9092"
    volumes:
      - type: bind
        source: ./server2.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-2:9092"
  kafka-3:
    container_name: kafka-3
    image: confluent/kafka
    ports:
      - "127.0.0.3:9092:9092"
    volumes:
      - type: bind
        source: ./server3.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-3:9092"

Last but not least you can find the code on github.

Kafka & Zookeeper for Development: Connecting Brokers to the Ensemble

Previously we created successfully a Zookeeper ensemble, now it’s time to add some Kafka brokers that will connect to the ensemble and we shall execute some commands.

We will pick up from the same docker compose file we compiled previously. First let’s jump on the configuration that a Kafka broker needs.

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
group.initial.rebalance.delay.ms=0
socket.send.buffer.bytes=102400
delete.topic.enable=true
socket.request.max.bytes=104857600
log.cleaner.enable=true
log.retention.check.interval.ms=300000
log.retention.hours=168
num.io.threads=8
broker.id=0
log4j.opts=-Dlog4j.configuration=file:/etc/kafka/log4j.properties
log.dirs=/var/lib/kafka
auto.create.topics.enable=true
num.network.threads=3
socket.receive.buffer.bytes=102400
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=1
num.partitions=1
zookeeper.connection.timeout.ms=6000
zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181

Will go through the ones that is essential to know.

  • offsets.topic.replication.factor: how the internal offset topic gets replicated – replication factor
  • transaction.state.log.replication.factor: how the internal transaction topic gets replicated – replication factor
  • transaction.state.log.min.isr: the minimum in sync replicas for the internal transaction topic
  • delete.topic.enable: if not true Kafka will ignore the delete topic command
  • socket.request.max.bytes: the maximum size of requests
  • log.retention.check.interval.ms: the interval to evaluate if a log should be deleted
  • log.retention.hours: how many hours a log is retained before getting deleted
  • broker.id: what is the broker id of that installation
  • log.dirs: the directories where Kafka will store the log data, can be a comma separated
  • auto.create.topics.enable: create topics if they don’t exist on sending/consuming messages or asking for topic metadata
  • num.network.threads: threads on receiving requests and sending responses from the network
  • socket.receive.buffer.bytes: buffer of the server socket
  • log.segment.bytes: the size of a log file
  • num.recovery.threads.per.data.dir: threads used for log recovery at startup and flushing at shutdown
  • num.partitions: has to do with the default number of partition a topic will have once created if partition number is not specified.
  • zookeeper.connection.timeout.ms: time needed for a client to establish connection to ZooKeeper
  • zookeeper.connect: is the list of the ZooKeeper servers

Now it’s time to create the properties for each broker. Due to the broker.id property we need to have different files with the corresponding broker.id

So our first’s brokers file would look like this (broker.id 1). Keep in mind that those brokers will run on the same docker-compose file. Therefore the zookeeper.connect property contains the internal docker compose dns names. The name of the file would be named server1.properties.

socket.send.buffer.bytes=102400
delete.topic.enable=true
socket.request.max.bytes=104857600
log.cleaner.enable=true
log.retention.check.interval.ms=300000
log.retention.hours=168
num.io.threads=8
broker.id=1
transaction.state.log.replication.factor=1
log4j.opts=-Dlog4j.configuration\=file\:/etc/kafka/log4j.properties
group.initial.rebalance.delay.ms=0
log.dirs=/var/lib/kafka
auto.create.topics.enable=true
offsets.topic.replication.factor=1
num.network.threads=3
socket.receive.buffer.bytes=102400
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=1
num.partitions=1
transaction.state.log.min.isr=1
zookeeper.connection.timeout.ms=6000
zookeeper.connect=zookeeper-1\:2181,zookeeper-2\:2181,zookeeper-3\:2181

The same recipe applies for the broker.id=2 as well as broker.id=3

After creating those three broker configuration files it is time to change our docker-compose configuration.

version: "3.8"
services:
  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: "1"
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-2:
    container_name: zookeeper-2
    image: zookeeper
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: "2"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-3:
    container_name: zookeeper-3
    image: zookeeper
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: "3"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  kafka-1:
    container_name: kafka-1
    image: confluent/kafka
    ports:
    - "9092:9092"
    volumes:
    - type: bind
      source: ./server1.properties
      target: /etc/kafka/server.properties
  kafka-2:
    container_name: kafka-2
    image: confluent/kafka
    ports:
      - "9093:9092"
    volumes:
      - type: bind
        source: ./server2.properties
        target: /etc/kafka/server.properties
  kafka-3:
    container_name: kafka-3
    image: confluent/kafka
    ports:
      - "9094:9092"
    volumes:
      - type: bind
        source: ./server3.properties
        target: /etc/kafka/server.properties

Let’s spin up the docker-compose file.

> docker-compose -f docker-compose.yaml up

Just like the previous examples we shall run some commands through the containers.

Now that we have a proper cluster with Zookeeper and multiple Kafka brokers it is time to test them working together.
The first action is to create a topic with a replication factor of 3. The expected outcome would be for this topic to be replicated 3 kafka brokers

> docker exec -it kafka-1 /bin/bash
confluent@92a6d381d0db:/$ kafka-topics --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 --create --topic tutorial-topic --replication-factor 3 --partitions 1

Our topic has been created let’s check the description of the topic.

confluent@92a6d381d0db:/$ kafka-topics --describe --topic tutorial-topic --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
Topic:tutorial-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: tutorial-topic	Partition: 0	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3

As we see the Leader for the partition is broker 2

Next step is putting some data to the topic recently created. Before doing so I will add a consumer listening for messages to that topic. While we post messages to the topic those will be printed by this consumer.

> docker exec -it kafka-3 /bin/bash
confluent@4042774f8802:/$ kafka-console-consumer --topic tutorial-topic --from-beginning --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181

Let’s add some topic data.

> docker exec -it kafka-1 /bin/bash
confluent@92a6d381d0db:/$ kafka-console-producer --topic tutorial-topic --broker-list kafka-1:9092,kafka-2:9092
test1
test2
test3

As expected the consumer on the other terminal will print the messages expected.

test1
test2
test3

Due to having a cluster it would be nice to stop the leader broker and see another broker to take the leadership. While doing this the expected results will be to have all the messages replicated and no disruption on consuming and publishing the messages.

Stop the leader which is broker-2

> docker stop kafka-2

Check the leadership from another broker

confluent@92a6d381d0db:/$ kafka-topics --describe --topic tutorial-topic --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
Topic:tutorial-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: tutorial-topic	Partition: 0	Leader: 1	Replicas: 2,1,3	Isr: 1,3

The leader now is kafka-1

Read the messages to see that they did got replicated.

> docker exec -it kafka-3 /bin/bash
confluent@4042774f8802:/$ kafka-console-consumer --topic tutorial-topic --from-beginning --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
test1
test2
test3

As expected apart from the Leadership being in place our data have also been replicated!

If we try to post new messages, it will also be a successful action.

So to summarise we did run a Kafka cluster connected to a zookeeper ensemble. We did create a topic with replication enabled to 3 brokers and last but not least we did test what happens if one broker goes down.

On the next blog we are going to wrap it up so our local machine clients can connect to the docker compose ensemble.

Kafka & Zookeeper for Development: Zookeeper Ensemble

Previously we spun up Zookeeper and Kafka locally but also through Docker. What comes next is spinning up more than just one Kafka and Zookeeper node and create a 3 node cluster. To achieve this the easy way locally docker-compose will be used. Instead of spinning up various instances on the cloud or running various Java processes and altering configs, docker-compose will greatly help us to bootstrap a Zookeeper ensemble and the Kafka brokers, with everything needed preconfigured.

The first step is to create the Zookeeper ensemble but before going there let’s check the ports needed.
Zookeeper needs three ports.

  • 2181 is the client port. On the previous example it was the port our clients used to communicate with the server.
  • 2888 is the peer port. This is the port that zookeeper nodes use to talk to each other.
  • 3888 is the leader port. The port that nodes use to talk to each other when it comes to the leader election.

By using docker compose our nodes will use the same network and the container name will also be an internal dns entry.
The names on the zookeeper nodes would be zookeeper-1, zookeeper-2, zookeeper-3.

Our next goal is to give to each zookeeper node a configuration that will enable the nodes to discover each other.

This is the typical zookeeper configuration expected.

  • tickTime is the unit of time in milliseconds zookeeper uses for heartbeat and minimum session timeout.
  • dataDir is the location where ZooKeeper will store the in-memory database snapshots
  • initlimit and SyncLimit are used for the zookeeper synchronization.
  • server* are a list of the nodes that will have to communicate with each other

zookeeper.properties

clientPort=2181
dataDir=/var/lib/zookeeper
syncLimit=2
DATA.DIR=/var/log/zookeeper
initLimit=5
tickTime=2000
server.1=zookeeper-1:2888:3888
server.2=zookeeper-2:2888:3888
server.3=zookeeper-3:2888:3888

When it comes to a node the server that the node is located, should be bound to `0.0.0.0`. Thus we need three different zookeeper properties per node.

For example for the node with id 1 the file should be the following
zookeeper1.properties

clientPort=2181
dataDir=/var/lib/zookeeper
syncLimit=2
DATA.DIR=/var/log/zookeeper
initLimit=5
tickTime=2000
server.1=0.0.0.0:2888:3888
server.2=zookeeper-2:2888:3888
server.3=zookeeper-3:2888:3888

The next question that arises is the id file of zookeeper. How a zookeeper instance can identify which is its id.

Based on the documentation we need to specify the server ids using the myid file

The myid file is a plain text file located at a nodes dataDir containing only a number the server name.

So three myids files will be created each containing the number of the broker

myid_1.txt

1

myid_2.txt

2

myid_3.txt

3

It’s time to spin up the Zookeeper ensemble. We will use the files specified above. Different client ports are mapped to the host to avoid collision.

version: "3.8"
services:
  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    ports:
      - "2181:2181"
    volumes:
      - type: bind
        source: ./zookeeper1.properties
        target: /conf/zoo.cfg
      - type: bind
        source: ./myid_1.txt
        target: /data/myid
  zookeeper-2:
    container_name: zookeeper-2
    image: zookeeper
    ports:
      - "2182:2181"
    volumes:
    - type: bind
      source: ./zookeeper2.properties
      target: /conf/zoo.cfg
    - type: bind
      source: ./myid_2.txt
      target: /data/myid
  zookeeper-3:
    container_name: zookeeper-3
    image: zookeeper
    ports:
      - "2183:2181"
    volumes:
      - type: bind
        source: ./zookeeper3.properties
        target: /conf/zoo.cfg
      - type: bind
        source: ./myid_3.txt
        target: /data/myid

Eventually instead of having all those files mounted it would be better if we had a more simple option. Hopefully the image that we use gives us the choice of specifying ids and brokers using environment variables.

version: "3.8"
services:
  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: "1"
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-2:
    container_name: zookeeper-2
    image: zookeeper
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: "2"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-3:
    container_name: zookeeper-3
    image: zookeeper
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: "3"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181

Now let’s check the status of our zookeeper nodes.

Let’s use the zookeeper shell to see the leaders and followers

docker exec -it zookeeper-1 /bin/bash
>./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

And

> docker exec -it zookeeper-3 /bin/bash
root@81c2dc476127:/apache-zookeeper-3.6.2-bin# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

So in this tutorial we created a Zookeeper ensemble using docker-compose and we also had a leader election. This recipe can be adapted to apply on the a set of VMs or a container orchestration engine.

On the next tutorial we shall add some Kafka brokers to connect to the ensemble.

Kafka & Zookeeper for Development: Local and Docker

Kafka popularity increases every day more and more as it takes over the streaming world. It is already provided out of the box on cloud providers like AWS, Azure and IBM Cloud.
Eventually for cases of local development it is a bit peculiar due to requiring various moving parts.

This blog will focus on making it easy for a developer to spin up some Kafka instances on a local machine without having to spin up VMs on the cloud.

We shall start with the usual Zookeeper and Kafka configuration. The example bellow will fetch a specific version so after some time is good to check the Apache Website.

> wget https://www.mirrorservice.org/sites/ftp.apache.org/kafka/2.6.0/kafka_2.13-2.6.0.tgz
> tar xvf kafka_2.13-2.6.0.tgz
> cd kafka_2.13-2.6.0

We just downloaded Kafka locally and now is the time to Spin up Kafka.

First we should spin up the Zookeeper

> ./bin/zookeeper-server-start.sh config/zookeeper.properties

Then spin up the Kafka instance

> ./bin/kafka-server-start.sh config/server.properties

As you see we only spun up one instance of Kafka & Zookeeper. This is way different from what we do in production where ZooKeeper servers should be deployed on multiple nodes. More specific 2n + 1 ZooKeeper servers where n > 0 need to be deployed. This number helps the ZooKeeper ensemble to perform majority elections for leadership.

In our case for local development one Kafka broker and one Zookeeper instances are enough in order to create and consume a topic.

Let’s push some messages to a topic. There is no need to create the topic, pushing a message will create it.

bin/kafka-console-producer.sh --topic tutorial-topic --bootstrap-server localhost:9092
>a
>b
>c

Then let’s read it. Pay attention to the –from-beginning flag, all the messages submitted from the beginning shall be read.

bin/kafka-console-consumer.sh --topic tutorial-topic --from-beginning --bootstrap-server localhost:9092
>a
>b
>c

Now let’s try and do this using docker. The advantage of docker is that we can run Kafka on a local docker network and add as many machines as needed and establish a Zookeeper ensemble the easy way.

Start zookeeper first

docker run --rm --name zookeeper -p 2181:2181 confluent/zookeeper

And then start your docker container after doing a link with the zookeeper container.

docker run --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper confluent/kafka

Let’s create the messages through docker. As with most docker images you already have the tools needed bundled inside the image.
So the publish command would very close to the command we executed previously.

> docker exec -it kafka /bin/bash
kafka-console-producer --topic tutorial-topic --broker-list localhost:9092
a
b
c

The same applies for the consume command.

> docker exec -it kafka /bin/bash
kafka-console-consumer --topic tutorial-topic --from-beginning --zookeeper zookeeper:2181
a
b
c

That’s it! We Just run Kafka locally for local development seamlessly!

Run a docker PostgreSQL instance locally for Testing

Running a PostgreSQL instance ad-hoc for testing is not always as bootstraping as it should be. This blog will run a PostgreSQL instance that connects to your workstation’s network and instead of using one of the popular tools like dbeaver we shall use the client that comes with the instance. Also we shall run a bootstrap script to have some data pre-inserted.

Let’s get started by running the instance. On purpose I will use another port. On scenarios of multiple instances running in your workstation, port collisions are likely. The workaround would be to choose port 5433.

docker run --rm --name test-instance -e POSTGRES_PASSWORD=password -p 5433:5432 postgres

This will run PostgreSQL and you shall be able to connect to port 5433. On a CTRL-C the instance will be stopped and destroyed.

Now instead of using an external tool to connect let’s use the instance itself, it comes with psql pre-installed.

docker exec -it test-instance /bin/bash
> psql postgres postgres
postgres=# \h
Available help:
  ABORT                            ALTER TRIGGER                    CREATE RULE                      DROP GROUP                       LISTEN
  ALTER AGGREGATE                  ALTER TYPE                       CREATE SCHEMA                    DROP INDEX                       LOAD
  ALTER COLLATION                  ALTER USER                       CREATE SEQUENCE                  DROP LANGUAGE                    LOCK
.....
postgres=# \q

The instance works and connections from the outside are possible.

Next step would be to bootstrap a db initialization script.

#!/bin/bash
set -e
 
psql -v ON_ERROR_STOP=1 --username postgres --dbname postgres <<-EOSQL
    create schema test_schema;
 
    create table test_schema.employee(
        id  SERIAL PRIMARY KEY,
        firstname   TEXT    NOT NULL,
        lastname    TEXT    NOT NULL,
        email       TEXT    not null,
        age         INT     NOT NULL,
        salary         real,
        unique(email)
    );
 
    insert into test_schema.employee (firstname,lastname,email,age,salary)
    values ('John','Doe 1','john1@doe.com',18,1234.23);

EOSQL

Supposing the file with the script is called init_db.sh

Let’s run the command with the initialization schema mounted.

docker run --rm --name test-instance -v /path/to/init_db.sh:/docker-entrypoint-initdb.d/init-db-script.sh -e POSTGRES_PASSWORD=password -p 5433:5432 postgres

And let’s check the results.

docker exec -it test-instance /bin/bash
>psql postgres postgres
postgres=# SELECT*FROM test_schema.employee;
 id | firstname | lastname |     email     | age | salary
----+-----------+----------+---------------+-----+---------
  1 | John      | Doe 1    | john1@doe.com |  18 | 1234.23
(1 row)

That’s it! You created a Postgresql database through docker, you did connect to it also you added a bootstrap script with data.

Apache Ignite and Spring on your Kubernetes Cluster Part 3: Testing the application

On the previous blog we created our Kubernetes deployment files for our Ignite application. On this blog we shall deploy our Ignite application on Kubernetes. I will use minikube on this.

Let’s build first

mvn clean install

I shall create a simple docker image, thus a Dockerfile is neeeded.
Let’s add a Dockerfile to the root of our project.

FROM adoptopenjdk/openjdk11

COPY target/job-api-ignite-0.0.1-SNAPSHOT.jar app.jar

ENTRYPOINT ["java","-jar","app.jar"]

Now we want to deploy this to our local Κubernetes. Follow this guide on how to use local images on Kubernetes.

Then let’s build our app

docker build -f Dockerfile -t job-api:1.0 .

Time to apply our Kubernetes yaml files.

kubectl apply -f job-cache-rbac.yaml
kubectl apply -f job-api-deployment.yaml
kubectl apply -f job-api-service.yaml

Give it some time and check your pods

> kubectl get pods
NAME                                  READY   STATUS    RESTARTS   AGE
job-api-deployment-86f54c9d75-dpnsc   1/1     Running   0          11m
job-api-deployment-86f54c9d75-xj267   1/1     Running   0          11m

Let’s issue a request through the first pod. This request will reach github and then shall cache the results in memory.

kubectl exec -it job-api-deployment-86f54c9d75-dpnsc -- curl localhost:8080/jobs/github/1

Then we shall use the other endpoint in order to fetch data straight from ignite.

kubectl exec -it job-api-deployment-86f54c9d75-xj267 -- curl localhost:8080/jobs/github/ignite/1

So we are successful, which means that our Ignite cluster is running in our Kubernetes workloads. The data are cached and shared between the nodes.

You can find the code on GitHub.

Apache Ignite and Spring on your Kubernetes Cluster Part 2: Kubernetes deployment

Previously we have been successful on creating our first Spring boot Application powered by Apache Ignite.

On this blog we shall focus on what is needed to be done on the Kubernetes side in order to be able to spin up our application.

As described on a previous blog we need to have our Kubernetes RBAC policies in place.

We need a role, a service account and the binding.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: job-cache
rules:
  - apiGroups:
    - ""
    resources:
    - pods
    - endpoints
    verbs:
    - get
    - list
    - watch
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: job-cache
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  creationTimestamp: 2020-03-07T22:23:50Z
  name: job-cache
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: job-cache
subjects:
  - kind: ServiceAccount
    name: job-cache
    namespace: "default"

Our service account will be the job cache. This means that we should use the job-cache service account for our Ignite based workloads.

The next step is to create the deployment. The configuration would not be very different from statefulset as explained in a previous post.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: job-api-deployment
  labels:
    app: job-api
spec:
  replicas: 2
  selector:
    matchLabels:
      app: job-api
  template:
    metadata:
      labels:
        app: job-api
    spec:
      containers:
        - name: job-api
          image: job-api:1.0
          env:
            - name: IGNITE_QUIET
              value: "false"
            - name: IGNITE_CACHE_CLIENT
              value: "false"
          ports:
            - containerPort: 11211
              protocol: TCP
            - containerPort: 47100
              protocol: TCP
            - containerPort: 47500
              protocol: TCP
            - containerPort: 49112
              protocol: TCP
            - containerPort: 10800
              protocol: TCP
            - containerPort: 8080
              protocol: TCP
            - containerPort: 10900
              protocol: TCP
      serviceAccount: job-cache
      serviceAccountName: job-cache

This is simpler since the Ignite configuration has been done through Java code.
The image that you see is supposed to be your dockerised Java application we worked before.
The next big step is to define the service. I will not use one service for all. Instead I would create a service for the cache and a service for our api in order to be used as an api.

apiVersion: v1
kind: Service
metadata:
  labels:
    app: job-cache
  name: job-cache
spec:
  ports:
    - name: jdbc
      port: 11211
      protocol: TCP
      targetPort: 11211
    - name: spi-communication
      port: 47100
      protocol: TCP
      targetPort: 47100
    - name: spi-discovery
      port: 47500
      protocol: TCP
      targetPort: 47500
    - name: jmx
      port: 49112
      protocol: TCP
      targetPort: 49112
    - name: sql
      port: 10800
      protocol: TCP
      targetPort: 10800
    - name: rest
      port: 8080
      protocol: TCP
      targetPort: 8080
    - name: thin-clients
      port: 10900
      protocol: TCP
      targetPort: 10900
  selector:
    app: job-api
  type: ClusterIP

Without getting into kubernetes details, the Ignite nodes shall synchronize using the job-cache internal dns. So we shall use kubernetes internal dns capabilities to communicate with the Ignite cluster.

The next step is to create the service for the actual job api application.

apiVersion: v1
kind: Service
metadata:
  labels:
    app: job-api
  name: job-api
spec:
  ports:
    - name: rest-api
      port: 80
      protocol: TCP
      targetPort: 8080
  selector:
    app: job-api
  sessionAffinity: None
  type: ClusterIP

Οn the following blog we shall apply our configurations to kubernetes and test our codebase.

Apache Ignite and Spring on your Kubernetes Cluster Part 1: Spring Boot application

On a previous series of blogs we spun up an Ignite cluster on a Kubernetes cluster.
In this tutorial we shall use the Ignite cluster created previously on with a Spring Boot Application.


Let’s create our project using Spring Boot. The Spring Boot application will connect to the Ignite cluster.

Let’s add our dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.5.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.gkatzioura</groupId>
	<artifactId>job-api-ignite</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>job-api-ignite</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-cache</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-kubernetes</artifactId>
			<version>2.7.6</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-spring</artifactId>
			<version>2.7.6</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.ignite</groupId>
					<artifactId>ignite-indexing</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.12</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

As in previous tutorials we shall use GitHub’s Job api.

The first step would be to add the Job Model that deserializes.

package com.gkatzioura.jobapi.model;

import java.io.Serializable;

import lombok.Data;

@Data
public class Job implements Serializable {

	private String id;
	private String type;
	private String url;
	private String createdAt;
	private String company;
	private String companyUrl;
	private String location;
	private String title;
	private String description;

}

The we need a repository for the Jobs. Beware the class needs to be serializable. Ignite caches data off-heap.

package com.gkatzioura.jobapi.repository;

import java.util.ArrayList;
import java.util.List;

import com.gkatzioura.jobapi.model.Job;
import lombok.Data;
import org.apache.ignite.Ignite;

import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Repository;
import org.springframework.web.client.RestTemplate;

@Repository
public class GitHubJobRepository {

	private static final String JOB_API_CONSTANST = "https://jobs.github.com/positions.json?page={page}";
	public static final String GITHUBJOB_CACHE = "githubjob";

	private final RestTemplate restTemplate;
	private final Ignite ignite;

	GitHubJobRepository(Ignite ignite) {
		this.restTemplate = new RestTemplate();
		this.ignite = ignite;
	}

	@Cacheable(value = GITHUBJOB_CACHE)
	public List<Job> getJob(int page) {
		return restTemplate.getForObject(JOB_API_CONSTANST,JobList.class,page);
	}

	public List<Job> fetchFromIgnite(int page) {
		for(String cache: ignite.cacheNames()) {
			if(cache.equals(GITHUBJOB_CACHE)) {
				return (List<Job>) ignite.getOrCreateCache(cache).get(1);
			}
		}

		return new ArrayList<>();
	}

	@Data
	private static class JobList  extends ArrayList<Job> {
	}
}

The main reason the JobList class exists is for convenience for unmarshalling.
As you can see the repository has the annotation @Cacheable. This mean that our requests will be cached. The fetchFromIgnite method is a test method for the sake of this example. We shall use it to access the data cached by ignite directly.

We shall also add the controller.

package com.gkatzioura.jobapi.controller;

import java.util.List;

import com.gkatzioura.jobapi.model.Job;
import com.gkatzioura.jobapi.repository.GitHubJobRepository;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/jobs")
public class JobsController {

	private final GitHubJobRepository gitHubJobRepository;

	JobsController(GitHubJobRepository gitHubJobRepository) {
		this.gitHubJobRepository = gitHubJobRepository;
	}

	@GetMapping("/github/{page}")
	public List<Job> gitHub(@PathVariable("page") int page) {
		return this.gitHubJobRepository.getJob(page);
	}

	@GetMapping("/github/ignite/{page}")
	public List<Job> gitHubIgnite(@PathVariable("page") int page) {
		return this.gitHubJobRepository.fetchFromIgnite(page);
	}

}

Two methods on the controller, the one to fetch the data as usual and caches them behind the scenes and the other on that we shall use for testing.

It’s time for us to configure the Ignite client that uses the nodes on our Kubernetes cluster.

package com.gkatzioura.jobapi.config;


import lombok.extern.slf4j.Slf4j;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.spring.SpringCacheManager;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder;

import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableCaching
@Slf4j
public class SpringCacheConfiguration {

	@Bean
	public Ignite igniteInstance() {
		log.info("Creating ignite instance");
		TcpDiscoveryKubernetesIpFinder tcpDiscoveryKubernetesIpFinder = new TcpDiscoveryKubernetesIpFinder();
		tcpDiscoveryKubernetesIpFinder.setNamespace("default");
		tcpDiscoveryKubernetesIpFinder.setServiceName("job-cache");

		TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
		tcpDiscoverySpi.setIpFinder(tcpDiscoveryKubernetesIpFinder);

		IgniteConfiguration igniteConfiguration = new IgniteConfiguration();

		igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);
		igniteConfiguration.setClientMode(false);

		return Ignition.start(igniteConfiguration);
	}

	@Bean
	public SpringCacheManager cacheManager(Ignite ignite) {
		SpringCacheManager springCacheManager =new SpringCacheManager();
		springCacheManager.setIgniteInstanceName(ignite.name());
		return springCacheManager;
	}

}

We created our cache. It shall use the Kubernetes TCP discovery mode.

The next step is to add our Main class.

package com.gkatzioura.jobapi;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;

@SpringBootApplication
@EnableCaching
public class IgniteKubeClusterApplication {

	public static void main(String[] args) {
		SpringApplication.run(IgniteKubeClusterApplication.class, args);
	}

}

The next blog will be focused on shipping the solution to kubernetes.

Apache Ignite on your Kubernetes Cluster Part 4: Deployment explained

Previously we saw the Ignite configuration that comes with the Kubernetes installation.
The default configuration does not have persistence enabled so we won’t focus on any storage classes provided by the helm chart.

The default installation uses a stateful set. You can find more information on a stateful set on the Kubernetes documentation.

> kubectl get statefulset ignite-cache -o yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  creationTimestamp: 2020-04-09T12:29:04Z
  generation: 1
  labels:
    app.kubernetes.io/instance: ignite-cache
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: ignite
    helm.sh/chart: ignite-1.0.1
  name: ignite-cache
  namespace: default
  resourceVersion: "281390"
  selfLink: /apis/apps/v1/namespaces/default/statefulsets/ignite-cache
  uid: fcaa7bef-84cd-4e7c-aa33-a4312a1d47a9
spec:
  podManagementPolicy: OrderedReady
  replicas: 2
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: ignite-cache
  serviceName: ignite-cache
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: ignite-cache
    spec:
      containers:
      - env:
        - name: IGNITE_QUIET
          value: "false"
        - name: JVM_OPTS
          value: -Djava.net.preferIPv4Stack=true
        - name: OPTION_LIBS
          value: ignite-kubernetes,ignite-rest-http
        image: apacheignite/ignite:2.7.6
        imagePullPolicy: IfNotPresent
        name: ignite
        ports:
        - containerPort: 11211
          protocol: TCP
        - containerPort: 47100
          protocol: TCP
        - containerPort: 47500
          protocol: TCP
        - containerPort: 49112
          protocol: TCP
        - containerPort: 10800
          protocol: TCP
        - containerPort: 8080
          protocol: TCP
        - containerPort: 10900
          protocol: TCP
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /opt/ignite/apache-ignite/config
          name: config-volume
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: ignite-cache
      serviceAccountName: ignite-cache
      terminationGracePeriodSeconds: 30
      volumes:
      - configMap:
          defaultMode: 420
          items:
          - key: ignite-config.xml
            path: default-config.xml
          name: ignite-cache-configmap
        name: config-volume
  updateStrategy:
    rollingUpdate:
      partition: 0
    type: RollingUpdate
status:
  replicas: 0

As you can see the Ingite configuration has been mounted through the configmap. Also you can see that this pod will use a specific service account.
Through the environment variables certain libraries are enabled which provide more features on the Ignite cluster. Also the ports needed for the communication and various protocols are being specified.

The last step is the service. All the ignite nodes shall be load balancer behind the Kubernetes service.

> kubectl get svc ignite-cache -o yaml
apiVersion: v1
kind: Service
metadata:
  creationTimestamp: 2020-04-09T12:29:04Z
  labels:
    app: ignite-cache
  name: ignite-cache
  namespace: default
  resourceVersion: "281389"
  selfLink: /api/v1/namespaces/default/services/ignite-cache
  uid: 5be68e28-a57c-4cb5-b610-b708bff80da7
spec:
  clusterIP: None
  ports:
  - name: jdbc
    port: 11211
    protocol: TCP
    targetPort: 11211
  - name: spi-communication
    port: 47100
    protocol: TCP
    targetPort: 47100
  - name: spi-discovery
    port: 47500
    protocol: TCP
    targetPort: 47500
  - name: jmx
    port: 49112
    protocol: TCP
    targetPort: 49112
  - name: sql
    port: 10800
    protocol: TCP
    targetPort: 10800
  - name: rest
    port: 8080
    protocol: TCP
    targetPort: 8080
  - name: thin-clients
    port: 10900
    protocol: TCP
    targetPort: 10900
  selector:
    app: ignite-cache
  sessionAffinity: None
  type: ClusterIP
status:
  loadBalancer: {}

Whether you add a new node or you add an ignite client node your ignite cluster shall be reached through this Kubernetes service. Apart from, that based on the Kubernetes services you can make this cache public or internal.

Apache Ignite on your Kubernetes Cluster Part 3: Configuration explained

Previously we had a look on the RBAC needed for and ignite cluster in Kubernetes.
This blogs focuses on the deployment and the configuration of the cache.

The default ignite installation uses and xml based configuration. It is easy to mount files using configmaps.

> kubectl get configmap ignite-cache-configmap -o yaml
NAME                     DATA   AGE
ignite-cache-configmap   1      32d
gkatzioura@MacBook-Pro-2 templates % kubectl get configmap ignite-cache-configmap -o yaml
apiVersion: v1
data:
  ignite-config.xml: "....\n"
kind: ConfigMap
metadata:
  creationTimestamp: 2020-03-07T22:23:50Z
  name: ignite-cache-configmap
  namespace: default
  resourceVersion: "137521"
  selfLink: /api/v1/namespaces/default/configmaps/ignite-cache-configmap
  uid: ff530e3d-10d6-4708-817f-f9845886c1b0

Since viewing the xml from the configmap is cumbersome this is the actual xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
		       http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans.xsd">
	<bean class="org.apache.ignite.configuration.IgniteConfiguration">
		<property
				name="peerClassLoadingEnabled" value="false"/>
		<property name="dataStorageConfiguration">
			<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
			</bean>
		</property>
		<property
				name="discoverySpi">
			<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
				<property name="ipFinder">
					<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder">
						<property name="namespace" value="default"/>
						<property
								name="serviceName" value="ignite-cache"/>
					</bean>
				</property>
			</bean>
		</property>
	</bean>
</beans>

The default DataStorageConfiguration is being used.
What you can see different from other ignite installations is TCP discovery. The tcp discover used is using the Kubernetes TCP based discovery.

The next blog focuses on the services and the deployment.