Debezium Server with PostgreSQL and Redis Stream

Debezium is a great tool for capturing the row level changes that happen on a Database and stream those changes to a broker of our choice.

Since this functionality stays in the boundaries of a Database, it helps on keeping our applications simple. For example there in no need for an application to emit events on any database interactions. Debezium will monitor the row changes and will emit the events. Based on the broker solution used with Debezium a consumer can subscribe to the broker thus receive the changes.

PostgreSQL being a popular SQL database, it is supported by Debezium.

Our goal would be to listen to PostgreSQL changes and stream them to a Redis stream through a Debezium Server. It is common to use Debizum with Kafka, in case where Kafka is not present in a team’s Tech stack we can use other brokers.

In our case we would keep things lightweight by using Redis Streams.

Redis will be setup without any extra configurations.

In order to use PostgreSQL with Debezium it is essentials to alter the configuration on postgreSQL.

The configuration we shall use on postgreSQL will be the following

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3

As we can see we use the logical_decoding from PostgreSQL.
From the documentation:

Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.

In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.

We will also create a namespace and a table for PostgreSQL. The namespace and the table will be the ones to listen for changes.

#!/bin/bash
set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
  create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );
EOSQL

This is the table we used in a previous PostgreSQL example.

Debezium will have to be able to interact with the PostgreSQL server as well as the the redis server.
The configuration should be the following.

debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=test_schema
debezium.source.plugin.name=pgoutput

By examining the configuration we can see that we have the necessary information for Debezium to communicate to the PostgreSQL database, also we specify the schema that we created previously. Therefore only changes from that schema will be streamed. We can also make things more restrictive for example whitelisting tables.

Since this demo will involve three different software Components docker compose will come in handy.

version: '3.1'

services:
  redis:
    image: redis
    ports:
      - 6379:6379
    depends_on:
      - postgres
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432
  debezium:
    image: debezium/server
    volumes:
      - ./conf:/debezium/conf
      - ./data:/debezium/data
    depends_on:
      - redis

By using Compose we were able to spin up three different software components on the same network. This helps the components to interact with each other by using the dns names of the services as specified on Compose. Also the configuration files we created previously are mounted to the Docker containers. Docker Compose V2 is out there with many good features, you can find more about it on the book I authored
A Developer’s Essential Guide to Docker Compose
.

In order to get the stack running we shall execute the following command

$ docker compose up

Since it is up and running, we can now start listening for events.

We shall login to Redis and start listen for any possible database updates.

$ docker exec -it debezium-example-redis-1 redis-cli
> xread block 1000000 streams tutorial.test_schema.employee $

This will make it possible to block until we receive one event from the stream.
If we examine the stream name we should see the pattern of {server-name}.{schema}.{table}. This would allow consumers to subscribe only to the changes of interest.

Onwards we will make an entry.

$ docker exec -it debezium-example-postgres-1 psql postgres postgres
> insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);
> \q

If we check the redis session we should see that we received an event from the Redis stream

127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $
1) 1) "tutorial.test_schema.employee"
   2) 1) 1) "1663796657336-0"
         2) 1) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}"
            2) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"john1@doe.com\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\"1.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"24289128\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}"
(10.17s)
127.0.0.1:6379> 

How cool is that? We can now start streaming our databases changes to the broker of our choice.

You can find the source code on GitHub.