Add ZipKin to your Spring application

If your application contains multiple services interacting with each other the need for distributed tracing is increasing. You have a call towards one application that also calls another application, in certain cases the application to be accessed next might be a different one. You need to trace the request end to end and identify what happened to the call.
Zipkin is a Distributed Tracing system. Essentially by using Zipkin on our system we can track how a call spans across various Microservices.

ZipKin comes with Various database options. In our case we shall use Elasticsearch.

We will setup out ZipKin server using Compose

Let’s start with our Compose file:

services:
  redis:
    image: redis
    ports:
      - 6379:6379
  elasticsearch:
    image: elasticsearch:7.17.7
    ports:
      - 9200:9200
      - 9300:9300
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9200/_cat/health"]
      interval: 20s
      timeout: 10s
      retries: 5
      start_period: 5s
    environment:
      - discovery.type=single-node
    restart: always
  zipkin:
    image: openzipkin/zipkin-slim
    ports:
      - 9411:9411
    environment:
      - STORAGE_TYPE=elasticsearch
      - ES_HOSTS=http://elasticsearch:9200
      - JAVA_OPTS=-Xms1G -Xmx1G -XX:+ExitOnOutOfMemoryError
    depends_on:
      - elasticsearch
    restart: always

We can run the above using

docker compose up

You can find more on Compose on the Developers Essential Guide to Docker Compose.

Let’s build our applications, our applications will be servlet based

We shall use a service for locations, this service essentially will persist locations on a Redis database using the GeoHash data structure.
You can find the service implementation on a previous blog.

We would like to add some extra dependencies so that Zipkin integration is possible.

...
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2021.0.5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
...
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-zipkin</artifactId>
        </dependency>
...

Spring Sleuth provides distributed tracing to our Spring application. By using spring Sleuth tracing data are generated. In case of a servlet filter or a rest template tracing data will also be generated. Provided the Zipkin binary is included the data generated will be dispatched to the Zipkin collector specified using
spring.zipkin.baseUrl.

Let’s also make our entry point application. This application will execute requests towards the location service we implemented previously.
The dependencies will be the following.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <groupId>org.example</groupId>
    <version>1.0-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>european-venue</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2021.0.5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-zipkin</artifactId>
        </dependency>
    </dependencies>

</project>

We shall create a service interacting with the location service.

The location model shall be the same:

package org.landing;

import lombok.Data;

@Data
public class Location {

    private String name;
    private Double lat;
    private Double lng;

}

And the service:

package org.landing;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;


@Service
public class LocationService {

    @Autowired
    private RestTemplate restTemplate;

    @Value("${location.endpoint}")
    private String locationEndpoint;

    public void checkIn(Location location) {
        restTemplate.postForLocation(locationEndpoint, location);
    }

}

Following we will add the controller:

package org.landing;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import lombok.AllArgsConstructor;

@RestController
@AllArgsConstructor
public class CheckinController {

    private final LocationService locationService;

    @PostMapping("/checkIn")
    public ResponseEntity<String> checkIn(@RequestBody Location location) {
        locationService.checkIn(location);
        return ResponseEntity.ok("Success");
    }

}

We need also RestTemplate to be configured:

package org.landing;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

@Configuration
public class RestTemplateConfiguration {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}

Last but not least the main method:

package org.location;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LocationApplication {

    public static void main(String[] args) {
        SpringApplication.run(LocationApplication.class);
    }

}

Now that everything is up and running let’s put this into action

curl --location --request POST 'localhost:8081/checkIn/' \
--header 'Content-Type: application/json' \
--data-raw '{
	"name":"Liverpool Street",
	"lat": 51.517336,
	"lng": -0.082966
}'
> Success

Let’s navigate now to the Zipkin Dashboard and search traces for the european-venue.
By expanding the calls we shall end up to a call like this.

Essentially we have an end to end tracing for our applications. We can see the entry point which is the european-venue checkin endpoint.
Also because the location service is called we have data points for the call received. However we also see data points for the call towards the redis database.
Essentially by adding sleuth to our application, beans that are ingress and egress points are being wrapped so that trace data can be reported.

You can find the code on GitHub.

Advertisement

Use Redis GeoHash with Spring boot

One very handy Data Structure when it comes to Redis is the GeoHash Data structure. Essentially it is a sorted set that generates a score based on the longitude and latitude.

We will spin up a Redis database using Compose

services:
  redis:
    image: redis
    ports:
      - 6379:6379

Can be run like this

docker compose up

You can find more on Compose on the Developers Essential Guide to Docker Compose.

Let’s add our dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <groupId>org.example</groupId>
    <version>1.0-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>location-service</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.7.5</version>
        </dependency>
    </dependencies>
</project>

We shall start with our Configuration. For convenience on injecting we shall create a GeoOperations<String,String> bean.

package org.location;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.GeoOperations;
import org.springframework.data.redis.core.RedisTemplate;

@Configuration
public class RedisConfiguration {

    @Bean
    public GeoOperations<String,String> geoOperations(RedisTemplate<String,String> template) {
        return template.opsForGeo();
    }

}

Our model would be this one

package org.location;

import lombok.Data;

@Data
public class Location {

    private String name;
    private Double lat;
    private Double lng;

}

This simple service will persist venue locations and also fetch venues nearby of a location.

package org.location;

import java.util.List;
import java.util.stream.Collectors;

import org.springframework.data.geo.Circle;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metrics;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.core.GeoOperations;
import org.springframework.data.redis.domain.geo.GeoLocation;
import org.springframework.stereotype.Service;

@Service
public class GeoService {

    public static final String VENUS_VISITED = "venues_visited";
    private final GeoOperations<String, String> geoOperations;

    public GeoService(GeoOperations<String, String> geoOperations) {
        this.geoOperations = geoOperations;
    }

    public void add(Location location) {
        Point point = new Point(location.getLng(), location.getLat());
        geoOperations.add(VENUS_VISITED, point, location.getName());

    }

    public List<String> nearByVenues(Double lng, Double lat, Double kmDistance) {
        Circle circle = new Circle(new Point(lng, lat), new Distance(kmDistance, Metrics.KILOMETERS));
        GeoResults<RedisGeoCommands.GeoLocation<String>> res = geoOperations.radius(VENUS_VISITED, circle);
        return res.getContent().stream()
                  .map(GeoResult::getContent)
                  .map(GeoLocation::getName)
                  .collect(Collectors.toList());
    }

}

We shall also add a controller

package org.location;

import java.util.List;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class LocationController {

    private final GeoService geoService;

    public LocationController(GeoService geoService) {
        this.geoService = geoService;
    }

    @PostMapping("/location")
    public ResponseEntity<String> addLocation(@RequestBody Location location) {
        geoService.add(location);
        return ResponseEntity.ok("Success");
    }

    @GetMapping("/location/nearby")
    public ResponseEntity<List<String>> locations(Double lng, Double lat, Double km) {
        List<String> locations = geoService.nearByVenues(lng, lat, km);
        return ResponseEntity.ok(locations);
    }

}

Then let’s add an element.

curl --location --request POST 'localhost:8080/location' \
--header 'Content-Type: application/json' \
--data-raw '{
	"lng": 51.5187516,
	"lat":-0.0814374,
	"name": "liverpool-street"
}'

Let’s retrieve the element of the api

curl --location --request GET 'localhost:8080/location/nearby?lng=51.4595573&lat=0.24949&km=100'
> [
    "liverpool-street"
]

And also let’s check redis

ZRANGE venues_visited 0 -1 WITHSCORES
1) "liverpool-street"
2) "2770072452773375"

We did it, pretty convenient for our day to day distance use cases.

Gradle: Push to Maven Repository

If you are a developer sharing your artifacts is a common task, that needs to be in place from the start.

In most teams and companies a Maven repository is already setup, this repository would be used mostly through CI/CD tasks enabling developers to distribute the generated artifacts.

In order to make the example possible we shall spin up a Nexus repository using Docker Compose.

First let’s create a default password and the directory containing the plugins Nexus will download. As of now the password is clear text, this will serve us for doing our first action. By resetting the admin password on nexus the file shall be removed, thus there won’t be a file with a clear text password.

mkdir nexus-data
cat a-test-password > admin.password

Then onwards to the Compose file:

services:
  nexus:
    image: sonatype/nexus3
    ports:
      - 8081:8081
    environment:
      INSTALL4J_ADD_VM_PARAMS: "-Xms2703m -Xmx2703m -XX:MaxDirectMemorySize=2703m"
    volumes:
      - nexus-data:/nexus-data
      - ./admin.password:/nexus-data/admin.password
volumes:
  nexus-data:

Let’s examine the file.
By using INSTALL4J_ADD_VM_PARAMS we override the Java command that will run the Nexus server, this way we can instruct to use more memory.
Because nexus takes too long on initialization we shall create a Docker volume. Everytime we run the compose file, the initialization will not happen from start, instead will use the results of the previous initialization. By mounting the admin password created previously we have a predefined password for the service.

By issuing the following command, the server will be up and running:

 
docker compose up

You can find more on Compose on the Developers Essential Guide to Docker Compose.

After some time nexus will have been initialized and running, therefore we shall proceed to our Gradle configuration.

We will keep track of the version on the gradle.properties file

version 1.0-SNAPSHOT

The build.gradle file:

plugins {
    id 'java'
    id 'maven-publish'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
    useJUnitPlatform()
}

publishing {
    publications {
        mavenJava(MavenPublication) {
            groupId 'org.example'
            artifactId 'gradle-push'
            version version
            from components.java
            versionMapping {
                usage('java-api') {
                    fromResolutionOf('runtimeClasspath')
                }
                usage('java-runtime') {
                    fromResolutionResult()
                }
            }
            pom {
                name = 'gradle-push'
                description = 'Gradle Push to Nexus'
                url = 'https://github.com/gkatzioura/egkatzioura.wordpress.com.git'
                licenses {
                    license {
                        name = 'The Apache License, Version 2.0'
                        url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
                    }
                }
                developers {
                    developer {
                        id = 'John'
                        name = 'John Doe'
                        email = 'an-email@gmail.com'
                    }
                }
                scm {
                    connection = 'scm:git:https://github.com/gkatzioura/egkatzioura.wordpress.com.git'
                    developerConnection = 'scm:git:https://github.com/gkatzioura/egkatzioura.wordpress.com.git'
                    url = 'https://github.com/gkatzioura/egkatzioura.wordpress.com.git'
                }
            }
        }
    }

    repositories {
        maven {
            def releasesRepoUrl = "http://localhost:8081/repository/maven-releases/"
            def snapshotsRepoUrl = "http://localhost:8081/repository/maven-snapshots/"
            url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl

            credentials {
                username "admin"
                password "a-test-password"
            }
        }
    }
}

The plugin to be used is the maven-publish plugin. If we examine the file, we identify that the plugin generates a pom maven file which shall be used in order to execute the deploy command to Nexus. The repositories are configured on the repositories section including the nexus users we defined previously. Whether the version is a SNAPSHOT or a release the corresponding repository endpoint will be picked. As we see clear text password is used. On the next blog we will examine how we can avoid that.

Now it is sufficient to run

gradle publish

This will deploy the binary to the snapshot repository.

You can find the source code on GitHub.

New Book Day: A Developer’s Essential Guide to Docker Compose

As Developers nowadays we have a wide variety of Software components and Cloud Services to use. This was a scenario that we could not even imagine in the past.
I still remember when we had to setup our Application Servers and Databases on top of Bare metal servers.
This burst of computing functionality in the form of the Cloud and managed services, allowed us to be able to utilise more tools for our applications and build better products.
Issues like orchestrating workloads, isolating and shipping them went to a whole different level.
As a result containerization came to the rescue.
Docker took over the microservice world and became the dominant solution to deploy microservices and in certain cases even to deploy Databases and Brokers.
This brings us to the development process. Production deployments is a huge chapter on its own. Platform engineers needs to take care of the security, the scaling and robustness of container based deployments.
But the development process is also affected:

  • SQL/NoSQL Databases as well as purpose-built databases like InfluxDB need to be available locally
  • Scenarios of Microservices applications need to be tested
  • Other Components such as brokers need to be available for testing

A step forward to the challenges mentioned above is to utilise Docker and its rich functionality. As convenient as it is to spin up containers locally you still end up managing containers, volumes and networks. Most of the times those have been spinned up add-hoc or with some scripts that a team has to maintain.

Docker Compose is one of the solutions to the problems described. With Compose you can spin up multiple containers locally organised using yaml files.
Here are some of the benefits when used during development:

  • The containers are organised and can spin up or shut down in an organized way
  • Applications can be placed on different networks
  • Volumes can be managed and attached to containers in a managed way
  • Containers resolve each other’s location through a DNS automatically, manual linking is not needed.

I have been using Compose for years. It helped me to make my development process much more efficient. Also in certain cases it helped me on actual production deployments. Writing this book was an opportunity for me to advocate for Docker Compose.

 

 

Thanks to the amazing people at Packt publishing it was possible to write this book and give back to the community.

The book is focused on various aspects of Compose.

In the beginning it will be an extensive look on Docker Compose, how it is implemented, how it interacts with the Docker engine, the available commands as well as the functionality it provides.

Onwards we will dive deep in day to day development using Compose. We will spin up complex infrastructure locally, as well as simulate Microservice environments using Compose. We will take this concept further and incrementally simulate things that we have to deal with a production deployment, as well as issue workarounds. Lastly we will use Compose for CI/CD jobs on popular solutions like Github Actions, Travis, and BitBucket Pipelines.

The last part of the book is all about deploying to production. All the knowledge acquired previously can be used for actual production deployments. A production deployment comes with some standards:

  • Infrastructure as Code
  • Container registry
  • Networks
  • Load Balancing
  • Autoscaling

The above, in combination with the knowledge we accumulated so far using Compose, will be used for a deployment on AWS and Azure, the most popular Cloud providers. This will be done also with the extra help of Infrastructure as Code by using Terraform.

Lastly since many production deployments nowadays reside on Kubernetes we shall build a bridge between Compose and Kubernetes by migrating the existing Compose deployment using Kompose.

You can find the book on Amazon  as well as on the Packt portal.
Happy Learning!

Debezium Server with PostgreSQL and Redis Stream

Debezium is a great tool for capturing the row level changes that happen on a Database and stream those changes to a broker of our choice.

Since this functionality stays in the boundaries of a Database, it helps on keeping our applications simple. For example there in no need for an application to emit events on any database interactions. Debezium will monitor the row changes and will emit the events. Based on the broker solution used with Debezium a consumer can subscribe to the broker thus receive the changes.

PostgreSQL being a popular SQL database, it is supported by Debezium.

Our goal would be to listen to PostgreSQL changes and stream them to a Redis stream through a Debezium Server. It is common to use Debizum with Kafka, in case where Kafka is not present in a team’s Tech stack we can use other brokers.

In our case we would keep things lightweight by using Redis Streams.

Redis will be setup without any extra configurations.

In order to use PostgreSQL with Debezium it is essentials to alter the configuration on postgreSQL.

The configuration we shall use on postgreSQL will be the following

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3

As we can see we use the logical_decoding from PostgreSQL.
From the documentation:

Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.

In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.

We will also create a namespace and a table for PostgreSQL. The namespace and the table will be the ones to listen for changes.

#!/bin/bash
set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
  create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );
EOSQL

This is the table we used in a previous PostgreSQL example.

Debezium will have to be able to interact with the PostgreSQL server as well as the the redis server.
The configuration should be the following.

debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=test_schema
debezium.source.plugin.name=pgoutput

By examining the configuration we can see that we have the necessary information for Debezium to communicate to the PostgreSQL database, also we specify the schema that we created previously. Therefore only changes from that schema will be streamed. We can also make things more restrictive for example whitelisting tables.

Since this demo will involve three different software Components docker compose will come in handy.

version: '3.1'

services:
  redis:
    image: redis
    ports:
      - 6379:6379
    depends_on:
      - postgres
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432
  debezium:
    image: debezium/server
    volumes:
      - ./conf:/debezium/conf
      - ./data:/debezium/data
    depends_on:
      - redis

By using Compose we were able to spin up three different software components on the same network. This helps the components to interact with each other by using the dns names of the services as specified on Compose. Also the configuration files we created previously are mounted to the Docker containers. Docker Compose V2 is out there with many good features, you can find more about it on the book I authored
A Developer’s Essential Guide to Docker Compose
.

In order to get the stack running we shall execute the following command

$ docker compose up

Since it is up and running, we can now start listening for events.

We shall login to Redis and start listen for any possible database updates.

$ docker exec -it debezium-example-redis-1 redis-cli
> xread block 1000000 streams tutorial.test_schema.employee $

This will make it possible to block until we receive one event from the stream.
If we examine the stream name we should see the pattern of {server-name}.{schema}.{table}. This would allow consumers to subscribe only to the changes of interest.

Onwards we will make an entry.

$ docker exec -it debezium-example-postgres-1 psql postgres postgres
> insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);
> \q

If we check the redis session we should see that we received an event from the Redis stream

127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $
1) 1) "tutorial.test_schema.employee"
   2) 1) 1) "1663796657336-0"
         2) 1) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}"
            2) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"john1@doe.com\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\"1.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"24289128\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}"
(10.17s)
127.0.0.1:6379> 

How cool is that? We can now start streaming our databases changes to the broker of our choice.

You can find the source code on GitHub.

Add mTLS to Nginx

Previously we added ssl to an Nginx server. On this example we shall enhance our security by adding mTLS to Nginx.

Apart from encrypting the traffic between client and server, SSL is also a way for the client to make sure that the server connecting to, is a trusted source.

On the other hand mTLS is a way for the server to ensure that the client is a trusted one. The client does accept the SSL connection to the server however it has to present to the server a certificate signed from an authority that the Server accepts. This way the Server, by validating the certificate the client presents can allow the connection.

More or less we shall build upon the previous example. The ssl certificates shall be the same, however we shall add the configuration for mtls.

The server ssl creation.

mkdir certs

cd certs

openssl genrsa -des3 -out ca.key 4096
#Remove passphrase for example purposes
openssl rsa -in ca.key -out ca.key
openssl req -new -x509 -days 3650 -key ca.key -subj "/CN=*.your.hostname" -out ca.crt

printf test > passphrase.txt
openssl genrsa -des3 -passout file:passphrase.txt -out server.key 2048
openssl req -new -passin file:passphrase.txt -key server.key -subj "/CN=*.your.hostname" -out server.csr

openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

The above is sufficient to secure out Nginx with SSL. So let’s create the mTLS certificates for the clients.
In order to create a certificate for mTLS we need a certificate authority. For convenience the certificate authority will be the same as the one we generated on the previous example.

printf test > client_passphrase.txt
openssl genrsa -des3 -passout file:client_passphrase.txt -out client.key 2048
openssl rsa -passin file:client_passphrase.txt -in client.key -out client.key
openssl req -new -key client.key -subj "/CN=*.client.hostname" -out client.csr

##Sign the certificate with the certificate authority
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out client.crt

Take note that the client common name needs to be different from the server’s certs common name, or else your request will be reject.

So we have our client certificate generated.
The next step is to configure Nginx to force mTLS connections from a specific authority

server {
error_log /var/log/nginx/error.log debug;
    listen 443 ssl;
    server_name  test.your.hostname;
    ssl_password_file /etc/nginx/certs/password;
    ssl_certificate /etc/nginx/certs/tls.crt;
    ssl_certificate_key /etc/nginx/certs/tls.key;

    ssl_client_certificate /etc/nginx/mtls/ca.crt;
    ssl_verify_client on;
    ssl_verify_depth  3;

    ssl_protocols             TLSv1 TLSv1.1 TLSv1.2;

    location / {
    }

}

By using the ssl_client_certificate we point to the certificate authority that the client certificates should be signed from.
By using the ssl_verify_client as on, we enforce mTLS connections.

Since we have all certificates generated let’s spin up the Nginx server using docker.

docker run --rm --name mtls-nginx -p 443:443 -v $(pwd)/certs/ca.crt:/etc/nginx/mtls/ca.crt -v $(pwd)/certs/server.key:/etc/nginx/certs/tls.key -v $(pwd)/certs/server.crt:/etc/nginx/certs/tls.crt -v $(pwd)/nginx.mtls.conf:/etc/nginx/conf.d/nginx.conf -v $(pwd)/certs/passphrase.txt:/etc/nginx/certs/password nginx

Our server is up and running. Let’s try to do a request using curl without using any client certificates.

curl https://localhost/ --insecure

The result shall be

<html>
<head><title>400 No required SSL certificate was sent</title></head>
<body>
<center><h1>400 Bad Request</h1></center>
<center>No required SSL certificate was sent</center>
<hr><center>nginx/1.21.3</center>
</body>
</html>

As expected our request is rejected.
Let’s use the client certificates we generated from the expected certificate authority.

curl --key certs/client.key --cert certs/client.crt https://127.0.0.1 --insecure
<html>
<head><title>404 Not Found</title></head>
<body>
<center><h1>404 Not Found</h1></center>
<hr><center>nginx/1.21.3</center>
</body>
</html>

The connection was established and the client could connect to the Nginx instance.

Let’s put them all together

mkdir certs

cd certs

openssl genrsa -des3 -out ca.key 4096
#Remove passphrase for example purposes
openssl rsa -in ca.key -out ca.key
openssl req -new -x509 -days 3650 -key ca.key -subj "/CN=*.your.hostname" -out ca.crt

printf test > passphrase.txt
openssl genrsa -des3 -passout file:passphrase.txt -out server.key 2048
openssl req -new -passin file:passphrase.txt -key server.key -subj "/CN=*.your.hostname" -out server.csr

openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

printf test > client_passphrase.txt
openssl genrsa -des3 -passout file:client_passphrase.txt -out client.key 2048
openssl rsa -passin file:client_passphrase.txt -in client.key -out client.key
openssl req -new -key client.key -subj "/CN=*.client.hostname" -out client.csr

##Sign the certificate with the certificate authority
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out client.crt

cd ../

docker run --rm --name mtls-nginx -p 443:443 -v $(pwd)/certs/ca.crt:/etc/nginx/mtls/ca.crt -v $(pwd)/certs/server.key:/etc/nginx/certs/tls.key -v $(pwd)/certs/server.crt:/etc/nginx/certs/tls.crt -v $(pwd)/nginx.mtls.conf:/etc/nginx/conf.d/nginx.conf -v $(pwd)/certs/passphrase.txt:/etc/nginx/certs/password nginx

You can find the code on github

Add SSL to Nginx

Nginx is a versatile tool that has many usages, can be used as a reverse proxy, load balancer etc.

A common usage is to handle the SSL traffic in front of applications. Thus instead of handling SSL from your application layer you can have nginx in front.

In our example we shall generate the certificates and make Nginx do the tls termination. I will use self signed certificates for our example. The certificates will be self signed and have a CA authority which shall help us on another example. In a real world example the certificate authority is something external like Let’s Encrypt or GlobalSign. By creating our own certificate authority we will be able to simulate them

openssl genrsa -des3 -out ca.key 4096
#Remove passphrase for example purposes
openssl rsa -in ca.key -out ca.key
openssl req -new -x509 -days 3650 -key ca.key -subj "/CN=*.your.hostname" -out ca.crt

Now that we have a certificate authority lets create the server key and certificate. First step is to create the key.

printf test > passphrase.txt
openssl genrsa -des3 -passout file:passphrase.txt -out server.key 1024
openssl req -new -passin file:passphrase.txt -key server.key -subj "/CN=*.your.hostname" -out server.csr

The result is to have a private key and a certificate signing request (csr). The csr needs to be signed by a certificate authority. The certificate authority in our case would be the one we create previously.Take note that we did not remove the password from the server.key. It was done on purpose to display how to load on Nginx, if you don’t want to tackle it remove it as shown at the certificate authority example.

So let’s sign the csr.

openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

Now we are ready to install them on Nginx. We shall use docker on this one.
This is how the nginx configuration should. What we shall do is to mount the files we generated previously to our docker image.

server {

    listen 443 ssl;
    server_name  test.your.hostname;
    ssl_password_file /etc/nginx/certs/password
    ssl_certificate /etc/nginx/certs/tls.crt;
    ssl_certificate_key /etc/nginx/certs/tls.key;


    location / {

        error_log /var/log/front_end_errors.log;
    }

    location = /swagger.json {
        proxy_pass https://petstore.swagger.io/v2/swagger.json;
    }

}

Our docker command mounting the files.

docker run --rm --name some-nginx -p 443:443 -v $(pwd)/certs/server.key:/etc/nginx/certs/tls.key -v $(pwd)/certs/server.crt:/etc/nginx/certs/tls.crt -v $(pwd)/nginx.conf:/etc/nginx/conf.d/nginx.conf -v $(pwd)/certs/passphrase.txt:/etc/nginx/certs/password nginx

Since this is a self signed certificate it cannot be accessed by a browser without tweaks but we can use curl –insecure to inspect the results. On a trusted certificate authority this would not be the case.

curl https://localhost/ -v --insecure

Let’s put them all in a script

mkdir certs

cd certs

openssl genrsa -des3 -out ca.key 4096
#Remove passphrase for example purposes
openssl rsa -in ca.key -out ca.key
openssl req -new -x509 -days 3650 -key ca.key -subj "/CN=*.your.hostname" -out ca.crt

printf test > passphrase.txt
openssl genrsa -des3 -passout file:passphrase.txt -out server.key 2048
openssl req -new -passin file:passphrase.txt -key server.key -subj "/CN=*.your.hostname" -out server.csr

openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

cd ../

docker run --rm --name some-nginx -p 443:443 -v $(pwd)/certs/server.key:/etc/nginx/certs/tls.key -v $(pwd)/certs/server.crt:/etc/nginx/certs/tls.crt -v $(pwd)/nginx.conf:/etc/nginx/conf.d/nginx.conf -v $(pwd)/certs/passphrase.txt:/etc/nginx/certs/password nginx

You can find the code on github.

On the next blog we shall configure Nginx to support mTLS.

Kubernetes pod as a Bastion Host

In Cloud Native apps private networks, databases and services are a reality.

An infrastructure can be fully private and only a limited number of entry points can be available.

Obviously the more restricted the better.

Still there are cases where there has not been any infrastructure setup for the private services and ways to link towards them. however if there is access through Kubernetes, HAProxy can help.

HAProxy can accept a configuration file. Uploading that file as a configmap and then mount the configmap to a Kubernetes pod will be easy. Then the HAProxy Kubernetes pod will be able to spin up using that configuration and thus establish a proxy connection.

Let’s start with the ha-proxy configuration. The target would be a MySQL database with a private IP.

 
apiVersion: v1
data:
  haproxy.cfg: |-
    global
    defaults
        timeout client          30s
        timeout server          30s
        timeout connect         30s

    frontend frontend
        bind    0.0.0.0:3306
        default_backend backend

    backend backend
        mode                    tcp
        server upstream 10.0.1.7:3306
kind: ConfigMap
metadata:
  creationTimestamp: null
  name: mysql-haproxy-port-forward

On the upstream we just add the ip and the port of the db, on the frontend we specify the local port and address we shall use.

By doing the above we have a way to mount the config file to our Kubernetes pod.

Now let’s create the pod

 
apiVersion: v1
kind: Pod
metadata:
  creationTimestamp: null
  labels:
    run: mysql-forward-pod
  name: mysql-forward-pod
spec:
  containers:
    - command:
      - haproxy
      - -f
      - /usr/local/etc/haproxy/haproxy.cfg
      - -V
      image: haproxy:1.7-alpine
      name: mysql-forward-pod
      resources: {}
      volumeMounts:
        - mountPath: /usr/local/etc/haproxy/
          name: mysql-haproxy-port-forward
  dnsPolicy: ClusterFirst
  restartPolicy: Always
  volumes:
    - name: mysql-haproxy-port-forward
      configMap:
        name: mysql-haproxy-port-forward
status: {}

On the volume section we set the configmap as a volume. On the container section we mount the configmap to a path thus having access to the file.
We use a HAProxy image, and we provide the command to start HAProxy using the file we mounted before.

To test that it works, use a kubectl session that has port-forward permissions and do

 
kubectl port-forward  mysql-forward-pod 3306:3306

You shall be able to access mysql from your localhost.

Testing using TestContainers

Part of our everyday ci/cd tasks involve using containers in order for the tests to take effect.
So what if you could control the containers you use through your tests and serve your scenarios better.
Also what if you could do this in a more managed way?

Testcontainers is a Java library that supports JUnit tests, providing lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container.

You pretty much can guess what is all about. Our tests can spin up the containers with the parameters needed. We will get started by using it in our tests with Junit.

It all starts with the right dependencies. Supposing we use maven for this tutorial.

	<properties>
		<junit-jupiter.version>5.4.2</junit-jupiter.version>
		<testcontainers.version>1.15.0</testcontainers.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>testcontainers</artifactId>
			<version>${testcontainers.version}</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>junit-jupiter</artifactId>
			<version>${testcontainers.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

I shall use an example we already have with Hoverfly.
We can use Hoverfly on our tests either by running it using Java or having a Hoverfly container with the test cases preloaded.
On the previous blog Hoverfly was integrated in our tests through the Java binary.
For this blog we shall use the Hoverfly container.

Our end result will look like this.

package com.gkatzioura.hoverfly.docker;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public class ContainerBasedSimulation {

	private static final String SIMULATION_HOST_PATH = ContainerBasedSimulation.class.getClassLoader().getResource("simulation.json").getPath();

	@Container
	public static GenericContainer gcs = new GenericContainer("spectolabs/hoverfly")
			.withExposedPorts(8888)
			.withExposedPorts(8500)
			.withCommand("-webserver","-import","/var/hoverfly/simulation.json")
			.withClasspathResourceMapping("simulation.json","/var/hoverfly/simulation.json" ,BindMode.READ_ONLY);


	@Test
	void testHttpGet() {
		var hoverFlyHost = gcs.getHost();
		var hoverFlyPort = gcs.getMappedPort(8500);
		var client = HttpClient.newHttpClient();
		var request = HttpRequest.newBuilder()
				.uri(URI.create("http://"+hoverFlyHost+":"+ hoverFlyPort +"/user"))
				.build();
		var res = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();
		Assertions.assertEquals("{\"username\":\"test-user\"}",res);
	}

}

Let’s break it down.

The @Testcontainers annotation is needed for the Jupiter integration.

@Testcontainers
public class ContainerBasedSimulation {
}

We shall use a container image that is not preloaded among the test containers available (for example Elastic Search), thus we shall use the GenericContainer class.

@Container
public static GenericContainer gcs = new GenericContainer("spectolabs/hoverfly")

Since we want to load to the container a simulation, we need to set the path to our simulation from our host machine. By using withClasspathResourceMapping we directly specify files in our classpath, for example the test resources.

			.withClasspathResourceMapping("simulation.json","/var/hoverfly/simulation.json",BindMode.READ_ONLY);

Hoverfly needs the simulation and the admin port to be exposed so we shall instruct Testcontainers to expose those ports and map them to host.

new GenericContainer("spectolabs/hoverfly")
			.withExposedPorts(8888)
			.withExposedPorts(8500)

We need to have a simulation placed on the container. By using withFileSystemBind we specify the local path and the path on the container.

...
.withFileSystemBind(SIMULATION_HOST_PATH,"/var/hoverfly/simulation.json" ,BindMode.READ_ONLY)
...

Also docker images might need to have some extra commands, therefore we shall use .withCommand, to pass the commands needed.

...
.withCommand("-webserver","-import","/var/hoverfly/simulation.json")
...

Technically we can say we are ready to go and connect to the container, however when running test containers the container is not accessible through the port specified to do the binding. After all, if tests run on parallel there is going to be a collision. So what Testcontainers do is to map the exposed port of the container to a random local port.
This way port collisions are avoided.

	@Test
	void testHttpGet() {
		var hoverFlyHost = gcs.getHost();
		var hoverFlyPort = gcs.getMappedPort(8500);
		var client = HttpClient.newHttpClient();
		var request = HttpRequest.newBuilder()
				.uri(URI.create("http://"+hoverFlyHost+":"+ hoverFlyPort +"/user"))
				.build();
		var res = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
				.thenApply(HttpResponse::body)
				.join();
		Assertions.assertEquals("{\"username\":\"test-user\"}",res);
	}

Using GenericContainer.getMappedPort(8500) we can get the port we have to use to interact with the container. Also getHost() is essential too since it won’t always direct to localhost.

Last but not least while testing if your are curious enough and do a docker ps.

docker ps 
>04a322447226        testcontainers/ryuk:0.3.0   "/app"                   3 seconds ago       Up 2 seconds        0.0.0.0:32814->8080/tcp    testcontainers-ryuk-fb60c3c6-5f31-4f4e-9ab7-ce25a00eeccc

You shall see a container running which is not the one we instructed through our unit test. The ryuk container is responsible for removing containers/networks/volumes/images by given filter after specified delay.

That’s it! We just achieved running the container we needed through our a test and we successfully migrated a previous test to one using test containers.

Kafka & Zookeeper for Development: Connecting Clients to the Cluster

Previously we achieved to have our Kafka brokers connect to a ZooKeeper ensemble. Also we brought down some brokers checked the election leadership and produced/consumed some messages.

For now we want to make sure that we will be able to connect to those nodes. The problem with connecting to the ensemble we created previously is that it is located inside the container network. When a client interacts with one of the brokers and receives the full list of the brokers he will receive a list of IPs not accessible to it.

So the initial handshake of a client will be successful but then the client will try to interact withs some unreachable hosts.

In order to tackle this we will have a combination of workarounds.

The first one would be to bind the port of each Kafka broker to a different local ip.

kafka-1 will be mapped to 127.0.0.1:9092
kafka-2 will be mapped to 127.0.0.2:9092
kafka-3 will be mapped to 127.0.0.3:9092

So let’s create the aliases of those addresses

sudo ifconfig lo0 alias 127.0.0.2
sudo ifconfig lo0 alias 127.0.0.3

Now it’s possible to do the ip binding. Let’s also put those entries to our /etc/hosts. By doing this, we achieve our local network and our docker network to be in agreement on which broker they should access.

127.0.0.1	kafka-1
127.0.0.2	kafka-2
127.0.0.3	kafka-3

The next step is also to change the KAFKA_ADVERTISED_LISTENERS on each broker. We will adapt this to the DNS entry of each broker. By setting KAFKA_ADVERTISED_LISTENERS the clients from the outside can correctly connect to it, to an address reachable to them and not an address through the internal network. Further explanations can be found on this blog.

  kafka-1:
    container_name: kafka-1
    image: confluent/kafka
    ports:
    - "127.0.0.1:9092:9092"
    volumes:
    - type: bind
      source: ./server1.properties
      target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:9092"
  kafka-2:
    container_name: kafka-2
    image: confluent/kafka
    ports:
      - "127.0.0.2:9092:9092"
    volumes:
      - type: bind
        source: ./server2.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-2:9092"
  kafka-3:
    container_name: kafka-3
    image: confluent/kafka
    ports:
      - "127.0.0.3:9092:9092"
    volumes:
      - type: bind
        source: ./server3.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-3:9092"

We see the port binding change as well as the KAFKA_ADVERTISED_LISTENERS. Now let’s wrap everything together in our docker-compose

version: "3.8"
services:
  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: "1"
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-2:
    container_name: zookeeper-2
    image: zookeeper
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: "2"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  zookeeper-3:
    container_name: zookeeper-3
    image: zookeeper
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: "3"
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
  kafka-1:
    container_name: kafka-1
    image: confluent/kafka
    ports:
    - "127.0.0.1:9092:9092"
    volumes:
    - type: bind
      source: ./server1.properties
      target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:9092"
  kafka-2:
    container_name: kafka-2
    image: confluent/kafka
    ports:
      - "127.0.0.2:9092:9092"
    volumes:
      - type: bind
        source: ./server2.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-2:9092"
  kafka-3:
    container_name: kafka-3
    image: confluent/kafka
    ports:
      - "127.0.0.3:9092:9092"
    volumes:
      - type: bind
        source: ./server3.properties
        target: /etc/kafka/server.properties
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-3:9092"

You can find more on Compose on the Developers Essential Guide to Docker Compose.

Last but not least you can find the code on github.