Debezium in Embedded mode

In a previous blog we setup a Debezium server reading events from a from a PostgresQL database. Then we streamed those changes to a Redis instance through a Redis stream.

We might get the impression that in order to run Debezium we need to have two extra components running in our infrastructure:

  • A standalone Debezium server instance
  • A software component with streaming capabilities and various integrations, such as Redis or Kafka

This is not always the case since Debezium can run in embedded mode. By running in embedded mode you use Debezium in order to read directly from a database’s transaction log. It is up to you how you are gonna handle the entries retrieved. The process reading the entries from the transaction log can reside on any Java application thus there is no need for a standalone deployment.

Apart from the number of components reduced, the other benefit is that we can alter the entries as we read them from the database and take action in our application. Sometimes we might just need a subset of the capabilities offered.

Let’s use the same PotsgreSQL configurations we used previously

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3

Also we shall create an initialization script for the table we want to focus

set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
  create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,

Our Docker Compose file will look like this

version: '3.1'

    image: postgres
    restart: always
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
      - 5432:5432

The configuration files we created are mounted to the PostgreSQL Docker container. Docker Compose V2 is out there with many good features, you can find more about it on the book I authored:
A Developer’s Essential Guide to Docker Compose

Provided we run docker compose up, a postgresql server with a schema and a table will be up and running. Also that server will have logical decoding enabled and Debezium shall be able to track changes on that table through the transaction log.
We have everything needed to proceed on building our application.

First let’s add the dependencies needed:



We also need to create the Debezium embedded properties:


Apart from establishing the connection towards the PostgresQL Database we also decided to store the offset in a file. By using the offset in Debezium we keep track of the progress we do on processing the events.

On each change that happens on the table test_schema.employee we shall receive an event. Once we receive that event our codebase should handle it.
To handle the events we need to create a DebeziumEngine.ChangeConsumer. The ChangeConsumer will consume the events emitted.

package com.egkatzioura;

import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {

    public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
        for(RecordChangeEvent<SourceRecord> record: records) {


Every incoming event will be printed on the console.

Now we can add our main class where we setup the engine.

package com.egkatzioura;

import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.ChangeEventFormat;

import java.util.Properties;

public class Application {

    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();

        try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("")) {
        properties.put("",new File("offset.dat").getAbsolutePath());

        var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .notifying(new CustomChangeConsumer())



Provided our application is running as well as the PostgresQL database we configured previously, we can start inserting data

docker exec -it debezium-embedded-postgres-1 psql postgres postgres
psql (15.3 (Debian 15.3-1.pgdg120+1))
Type "help" for help.

postgres=# insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','',18,1234.23);

Also we can see the change on the console

SourceRecord{sourcePartition={server=embedded-debezium}, sourceOffset={last_snapshot_record=true, lsn=22518160, txId=743, ts_usec=1705916606794160, snapshot=true}} ConnectRecord{topic='embedded-debezium.test_schema.employee', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{embedded-debezium.test_schema.employee.Key:STRUCT}, value=Struct{after=Struct{id=1,firstname=John,lastname=Doe 1,,age=18,salary=1234.23},source=Struct{version=2.3.1.Final,connector=postgresql,name=embedded-debezium,ts_ms=1705916606794,snapshot=last,db=postgres,sequence=[null,"22518160"],schema=test_schema,table=employee,txId=743,lsn=22518160},op=r,ts_ms=1705916606890}, valueSchema=Schema{embedded-debezium.test_schema.employee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

We did it. We managed to run Debezium through a Java application without the need of a standalone Debezium server running or a streaming component. You can find the code on GitHub.

Static Factories are Great!

Every now and then I jump on classes with multiple constructors or classes that are rigorous to work with. Let alone not being able to mock part of their components and at the end being forced to use reflection for testing (mockito based, old school, you choose).

Imagine a Producer class that you use for Kafka. A class that provides you with some abstraction on sending messages.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class StrMessageProducer {

	private Producer<String,String> producer;
	private String topic = "test-topic";
	StrMessageProducer() {
		var kafkaProperties = new Properties();
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		this.producer = kafkaProducer;

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();"Submitted {}",metadata.offset());
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);

Apart from being an ugly class it is also very hard to change some of its components.

For example

  • I cannot use this class to post to another topic
  • I cannot use this class to use a different server configuration apart from the one on the properties
  • It’s difficult to test the functionality of the class since crucial components are created through the constructor

It’s obvious that the constructor in this case serves the purpose on creating a Kafka producer based on the system properties. But the responsibility of the class is to use that producer in order send messages in a specific way. Thus I will move the creation of the Producer from the constructor. Also because we might want to swap the topic used, I will also inject the topic instead of having it hardcoded.
By doing so we encourage dependency injection. We make it easy to swap the ingredients of the class however the execution would be the same.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class StrMessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();"Submitted {}",metadata.offset());
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);

But we still need the producer to be created somehow. This is where the factory pattern kicks in.

We shall add static factories in order to have instances of the StrMessageProducer class with different configurations.
Let’s add two factory methods
The first factory method would be based on system properties and the second on environment variables.

package com.gkatzioura.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class StrMessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();"Submitted {}",metadata.offset());
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);

	public static StrMessageProducer createFromSystemPros() {
		var kafkaProperties = new Properties();
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new MessageProducer(kafkaProducer, System.getProperty("main.topic"));

	public static StrMessageProducer createFromEnv() {
		var kafkaProperties = new Properties();
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new MessageProducer(kafkaProducer, System.getProperty("MAIN_TOPIC"));

You already see the benefits. You have a clean class ready to use as it is and you have some factory methods for convenience. Eventually you can add more static factories, some of them might also have arguments, for example the topic.

Also we can go one step further when we want to have multiple classes of MessageProducers and we want to utilise an interface. So we are going to introduce the MessageProducer interface which our StrMessageProducer class will implement. Also we are going to put the static factories to the interface.

So this will be our interface with the static factories.

package com.gkatzioura.kafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

public interface MessageProducer {
	void send(String message);

	static MessageProducer createFromSystemPros() {
		var kafkaProperties = new Properties();
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new StrMessageProducer(kafkaProducer, System.getProperty("main.topic"));

	static MessageProducer createFromEnv() {
		var kafkaProperties = new Properties();
		var kafkaProducer = new KafkaProducer<String,String>(kafkaProperties);
		return new StrMessageProducer(kafkaProducer, System.getProperty("MAIN_TOPIC"));


And this would be our new StrMessageProducer class.

package com.gkatzioura.kafka.producer;

import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class StrMessageProducer implements MessageProducer {

	private final Producer<String,String> producer;
	private final String topic;

	StrMessageProducer(Producer<String,String> producer, String topic) {
		this.producer = producer;
		this.topic = topic;

	public void send(String message) {
		var producerRecord = new ProducerRecord<String,String>(topic,null, message);
		try {
			var metadata = producer.send(producerRecord).get();"Submitted {}",metadata.offset());
		catch (InterruptedException |ExecutionException e) {
			log.error("Could not send record",e);


Let’s check the benefits

  • We can have various implementations of a MessageProducer class
  • We can add as many factories we want that serve our purpose
  • We can easily test the MessageProducer implementation by passing mocks to the constructors
  • We keep our codebase cleaner


Locking for multiple nodes the easy way: GCS

It happens to all of us. We develop stateless applications that can scale horizontally without any effort.
However sometimes cases arise where you need to achieve some type of coordination.

You can go really advanced on this one. For example you can use a framework like Akka and it’s cluster capabilities. Or you can go really simple like rolling a mechanism on your own as long as it gives you the results needed. On another note you can just have different node groups based on the work you need them to do. The options and the solutions can change based on the problem.

If your problem can go with a simple option, one way to do so , provided you use Google Cloud Storage, is to use its lock capabilities.
Imagine for example a scenario of 4 nodes, they do scale dynamically but each time a new node registers you want to change its actions by acquiring a unique configuration which does not collide with a configuration another node might have received.

The strategy can be to use a file on Google Cloud Storage for locking and a file that acts as a centralised configuration registry.

The lock file is nothing more that a file on cloud storage which shall be created and deleted. What will give us lock abilities is the option on GCS to create a file only if it not exists.
Thus a process from one node will try to create the `lock` file, this action would be equivalent to obtaining the lock.
Once the process is done will delete the file, this action would be equivalent to releasing the lock.
Other processes in the meantime will try to create the file (acquire the lock) and fail (file already exists) because other processes have created the file.
Meanwhile the process that has successfully created the file (acquired the lock) will change the centralised configuration registry and once done will delete the file (release the lock).

So let’s start with the lock object.

package com.gkatzioura.gcs.lock;

import java.util.Optional;


public class GCSLock {

	public static final String LOCK_STRING = "_lock";
	private final Storage storage;
	private final String bucket;
	private final String keyName;

	private Optional<Blob> acquired = Optional.empty();

	GCSLock(Storage storage, String bucket, String keyName) { = storage;
		this.bucket = bucket;
		this.keyName = keyName;

	public boolean acquire() {
		try {
			var blobInfo = BlobInfo.newBuilder(bucket, keyName).build();
			var blob = storage.create(blobInfo, LOCK_STRING.getBytes(), Storage.BlobTargetOption.doesNotExist());
			acquired = Optional.of(blob);
			return true;
		} catch (StorageException storageException) {
			return false;

	public void release() {
		if(!acquired.isPresent()) {
			throw new IllegalStateException("Lock was never acquired");


As you can see the write specifies to write an object only if it does not exist. This operation behind the scenes is using the x-goog-if-generation-match header which is used for concurrency.
Thus one node will be able to acquire the lock and change the configuration files.
Afterwards it can delete the lock. If an exception is raised probably the operation fails and the lock is already acquired.

To make the example more complete let’s make the configuration file. The configuration file would be a simple json file for key map actions.

package com.gkatzioura.gcs.lock;

import java.util.HashMap;
import java.util.Map;

import org.json.JSONObject;

public class GCSConfiguration {

	private final Storage storage;
	private final String bucket;
	private final String keyName;

	GCSConfiguration(Storage storage, String bucket, String keyName) { = storage;
		this.bucket = bucket;
		this.keyName = keyName;

	public void addProperty(String key, String value) {
		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		final JSONObject configJson;

		if(blob==null) {
			configJson = new JSONObject();
		} else {
			configJson = new JSONObject(new String(blob.getContent()));

		configJson.put(key, value);

		var blobInfo = BlobInfo.newBuilder(blobId).build();
		storage.create(blobInfo, configJson.toString().getBytes());

	public Map<String,String> properties() {

		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		var map = new HashMap<String,String>();

		if(blob!=null) {
			var jsonObject = new JSONObject(new String(blob.getContent()));
			for(var key: jsonObject.keySet()) {
				map.put(key, jsonObject.getString(key));

		return map;


It is a simple config util backed by GCS. Eventually it can be changed and put the lock operating inside the addProperty operation, it’s up to the user and the code. For the purpose of this blog we shall just acquire the lock change the configuration and release the lock.
Our main class will look like this.

package com.gkatzioura.gcs.lock;


public class Application {

	public static void main(String[] args) {
		var storage = StorageOptions.getDefaultInstance().getService();

		final String bucketName = "bucketName";
		final String lockFileName = "lockFileName";
		final String configFileName = "configFileName";

		var lock = new GCSLock(storage, bucketName, lockFileName);
		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

		var lockAcquired = lock.acquire();
		if(lockAcquired) {
			gcsConfig.addProperty("testProperty", "testValue");

		var config =;

		for(var key: config.keySet()) {
			System.out.println("Key "+key+" value "+config.get(key));



Now let’s go for some multithreading. Ten threads will try to put values, it is expected that they have some failure.

package com.gkatzioura.gcs.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class ApplicationConcurrent {

	private static final String bucketName = "bucketName";
	private static final String lockFileName = "lockFileName";
	private static final String configFileName = "configFileName";

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		var storage = StorageOptions.getDefaultInstance().getService();

		final int threads = 10;
		var service = Executors.newFixedThreadPool(threads);
		var futures = new ArrayList<Future>(threads);

		for (var i = 0; i < threads; i++) {
			futures.add(service.submit(update(storage, "property-"+i, "value-"+i)));

		for (var f : futures) {


		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);
		var properties =;

		for(var i=0; i < threads; i++) { System.out.println(properties.get("property-"+i)); } } private static Runnable update(final Storage storage, String property, String value) { return () -> {
			var lock = new GCSLock(storage, bucketName, lockFileName);
			var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

			boolean lockAcquired = false;

			while (!lockAcquired) {
				lockAcquired = lock.acquire();
				System.out.println("Could not acquire lock");

			gcsConfig.addProperty(property, value);

Obviously 10 threads are ok to display the capabilities. During the thread initialization and execution some threads will try to acquire the lock simultaneously and one will fails, while other threads will be late and will fail and wait until the lock is available.

In the end what is expected is all of them to have their values added to the configuration.
That’s it. If your problems have a simple nature this approach might do the trick. Obviously you can use the http api instead of the sdk. You can find the code on github.

Spring Boot and Micrometer with Prometheus Part 6: Securing metrics

Previously we successfully spun up our Spring Boot application With Prometheus. An endpoint in our Spring application is exposing our metric data so that prometheus is able to retrieve them.
The main question that comes to mind is how to secure this information.

Spring already provides us with its great security framework, so it will be fairly easy to use it for our application. The goal would be to use basic authentication for the actuator/prometheus endpoints and also configure prometheus in order to access that information using basic authentication.

So the first step is to enable the security on our app. The first step is to add the security jar.


The Spring boot application will get secured on its own by generating a password for the default user.
However we do want to have control over the username and password so we are going to use some environment variables.

By running the application with the credentials for the default user we have the prometheus endpoints secured with a minimal configuration.

SPRING_SECURITY_USER_NAME=test-user SPRING_SECURITY_USER_PASSWORD=test-password mvn spring-boot:run

So now that we have the security setup on our app, it’s time to update our prometheus config.

  - job_name: 'prometheus-spring'
    scrape_interval: 1m
    metrics_path: '/actuator/prometheus'
      - targets: ['my.local.machine:8080']
      username: "test-user"
      password: "test-password"

So let’s run again prometheus as described previously.

To sum app after this change prometheus will gather metrics data for our application in a secure way.

My language will overtake yours

From time to time I stumble upon articles comparing one programming language to another or articles which claim that ‘one language will rule them all’.
Most of these comparisons depend largely on metrics based on searches and hits according to popularity (for example tiobe).

The question is not if the data is accurate but if comparisons like these are any more relevant to the current landscape of software development.

Back in the past we used to have languages developed in order to replace other languages and make developers more comfortable. Some languages became obsolete, and some were overtaken by others.
However as the industry continued to evolve, languages started to be associated with some certain fields of our industry. For example when it comes to machine learning and deep learning, python is the first language that comes to mind. As the fields of the industry involved the languages associated with, evolved too, leading to the development of tools and frameworks. As a result we now have programming languages with a huge ecosystem. The tools and the frameworks were not build in a day, it took years, a lot of effort, skills and experience on the pretty specific problems they had to tackle. As the years pass these tools mature and evolve.

Even though there are many good options out there, the major factors on adoption are not based on the language by itself but more on the tools that come with it.
Let’s take for example Java and the Java EE ecosystem. Although there is a great amount of articles discussing the death of Java, Java continuous to be the first choice, especially when it comes to enterprise development. There are certainly languages with better syntax and more convenient tools, but Java comes along with a huge ecosystem.
I believe that the language comparison should shift from language comparison to an ecosystem comparison based on the field.

Another fact that has to be considered is also the boost that happens to several industries. Certain industries require pretty specific solutions and tools, thus leading to a boost in the adoption to those tools and frameworks. It is not that a language just got more popular among developers, instead the industry just got bigger.

Finally we should take into consideration universities. Computer science universities play a leading role in the creation of the language popularity landscape.
For example many of the c language hits are due to university assignments. Furthermore r&d finds its way to the industry, implemented with different tools for environments much different than the original one.

All in all I believe that language comparison is no longer relevant. The industry has evolved a lot and the applications that we develop are largely different than the ones we used to develop in the past. In the past we used to develop console applications and it made sense to compare syntax and extra features. Nowadays we develop large scale applications in clustered environments employing various architectures and we need more than just a language to tackle these problems.