In a previous blog we setup a Debezium server reading events from a from a PostgresQL database. Then we streamed those changes to a Redis instance through a Redis stream.
We might get the impression that in order to run Debezium we need to have two extra components running in our infrastructure:
- A standalone Debezium server instance
- A software component with streaming capabilities and various integrations, such as Redis or Kafka
This is not always the case since Debezium can run in embedded mode. By running in embedded mode you use Debezium in order to read directly from a database’s transaction log. It is up to you how you are gonna handle the entries retrieved. The process reading the entries from the transaction log can reside on any Java application thus there is no need for a standalone deployment.
Apart from the number of components reduced, the other benefit is that we can alter the entries as we read them from the database and take action in our application. Sometimes we might just need a subset of the capabilities offered.
Let’s use the same PotsgreSQL configurations we used previously
listen_addresses = '*' port = 5432 max_connections = 20 shared_buffers = 128MB temp_buffers = 8MB work_mem = 4MB wal_level = logical max_wal_senders = 3
Also we shall create an initialization script for the table we want to focus
#!/bin/bash set -e psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL create schema test_schema; create table test_schema.employee( id SERIAL PRIMARY KEY, firstname TEXT NOT NULL, lastname TEXT NOT NULL, email TEXT not null, age INT NOT NULL, salary real, unique(email) ); EOSQL
Our Docker Compose file will look like this
version: '3.1' services: postgres: image: postgres restart: always environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres volumes: - ./postgresql.conf:/etc/postgresql/postgresql.conf - ./init:/docker-entrypoint-initdb.d command: - "-c" - "config_file=/etc/postgresql/postgresql.conf" ports: - 5432:5432
The configuration files we created are mounted to the PostgreSQL Docker container. Docker Compose V2 is out there with many good features, you can find more about it on the book I authored:
A Developer’s Essential Guide to Docker Compose.
Provided we run docker compose up
, a postgresql server with a schema and a table will be up and running. Also that server will have logical decoding enabled and Debezium shall be able to track changes on that table through the transaction log.
We have everything needed to proceed on building our application.
First let’s add the dependencies needed:
<properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <version.debezium>2.3.1.Final</version.debezium> <logback-core.version>1.4.12</logback-core.version> </properties> <dependencies> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-postgres</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-storage-jdbc</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback-core.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback-core.version}</version> </dependency> </dependencies>
We also need to create the Debezium embedded properties:
name=embedded-debezium-connector connector.class=io.debezium.connector.postgresql.PostgresConnector offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore offset.flush.interval.ms=60000 database.hostname=127.0.0.1 database.port=5432 database.user=postgres database.password=postgres database.dbname=postgres database.server.name==embedded-debezium debezium.source.plugin.name=pgoutput plugin.name=pgoutput database.server.id=1234 topic.prefix=embedded-debezium schema.include.list=test_schema table.include.list=test_schema.employee
Apart from establishing the connection towards the PostgresQL Database we also decided to store the offset in a file. By using the offset in Debezium we keep track of the progress we do on processing the events.
On each change that happens on the table test_schema.employee
we shall receive an event. Once we receive that event our codebase should handle it.
To handle the events we need to create a DebeziumEngine.ChangeConsumer
. The ChangeConsumer will consume the events emitted.
package com.egkatzioura; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.RecordChangeEvent; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> { @Override public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException { for(RecordChangeEvent<SourceRecord> record: records) { System.out.println(record.record().toString()); } } }
Every incoming event will be printed on the console.
Now we can add our main class where we setup the engine.
package com.egkatzioura; import io.debezium.embedded.Connect; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.ChangeEventFormat; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class Application { public static void main(String[] args) throws IOException { Properties properties = new Properties(); try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("embedded_debezium.properties")) { properties.load(stream); } properties.put("offset.storage.file.filename",new File("offset.dat").getAbsolutePath()); var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) .using(properties) .notifying(new CustomChangeConsumer()) .build(); engine.run(); } }
Provided our application is running as well as the PostgresQL database we configured previously, we can start inserting data
docker exec -it debezium-embedded-postgres-1 psql postgres postgres psql (15.3 (Debian 15.3-1.pgdg120+1)) Type "help" for help. postgres=# insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);
Also we can see the change on the console
SourceRecord{sourcePartition={server=embedded-debezium}, sourceOffset={last_snapshot_record=true, lsn=22518160, txId=743, ts_usec=1705916606794160, snapshot=true}} ConnectRecord{topic='embedded-debezium.test_schema.employee', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{embedded-debezium.test_schema.employee.Key:STRUCT}, value=Struct{after=Struct{id=1,firstname=John,lastname=Doe 1,email=john1@doe.com,age=18,salary=1234.23},source=Struct{version=2.3.1.Final,connector=postgresql,name=embedded-debezium,ts_ms=1705916606794,snapshot=last,db=postgres,sequence=[null,"22518160"],schema=test_schema,table=employee,txId=743,lsn=22518160},op=r,ts_ms=1705916606890}, valueSchema=Schema{embedded-debezium.test_schema.employee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
We did it. We managed to run Debezium through a Java application without the need of a standalone Debezium server running or a streaming component. You can find the code on GitHub.