Avro Schema Generate and Use

If you implement streaming pipelines, chances are that you use Apache Avro.
Since Avro is a popular choice for serializing data it is widely supported by streaming tools and vendors. Also schema registries are available in order to help with the schema evolution.

Let’s run a basic Avro example.

It all starts with creating the schema on an avsc file. The goal would be to send request metrics for an http endpoint.

{
  "namespace": "com.egkatzioura.avro.model",
  "name": "RequestMetric",
  "type" : "record",
  "fields" : [
    {
      "name": "endpoint",
      "type" : ["null","string"],
      "default": null

    },
    {
      "name" : "status",
      "type" : ["null","int"],
      "default": null
    },
    {
      "name" : "error_message",
      "type" : ["null", "string"],
      "default": null
    },
    {
      "name" : "created_at",
      "type": "long",
      "logicalType" : "timestamp-millis"
    }
  ]
}

If the fields in a record are nullable we need to specify so in the schema ["null", "string"]. Also we want to sent timestamp thus we shall use a logicalType. A logicalType can be a complex or a primitive type, in our case it is a long. By using the attribute logicalType we provide additional semantic meaning to that type.

We will create the directory src/main/avro and place the file under the name request_metric.avsc.

Provided we use maven in order to generate the class files we need to have certain plugins included.


    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.11.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

As we can see we specified where the schemas are placed within the project by using the sourceDirectory configuration. By using the outputDirectory configuration we specify where the generated classes will be placed.

By running on maven mvn generate-sources the class RequestMetric will be generated.

Let’s create and read an avro file.

import com.egkatzioura.avro.model.RequestMetric;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.File;
import java.io.IOException;


public class Application {

    public static void main(String[] args) throws IOException {
        RequestMetric a = new RequestMetric();
        a.setEndpoint("/a");
        a.setStatus(200);
        a.setCreatedAt(System.currentTimeMillis());

        RequestMetric b = new RequestMetric();
        b.setEndpoint("/b");
        b.setStatus(201);
        b.setCreatedAt(System.currentTimeMillis());

        File file = new File("metric.avro");

        SpecificDatumWriter<RequestMetric> datumWriter = new SpecificDatumWriter<>(RequestMetric.class);

        try(DataFileWriter<RequestMetric> dataFileWriter = new DataFileWriter<>(datumWriter)) {
            dataFileWriter.create(a.getSchema(), file);
            dataFileWriter.append(a);
            dataFileWriter.append(b);
        }

        DatumReader<RequestMetric> datumReader = new SpecificDatumReader<>(RequestMetric.class);
        DataFileReader<RequestMetric> dataFileReader = new DataFileReader<>(file, datumReader);
        RequestMetric requestMetric= null;
        while (dataFileReader.hasNext()) {
            requestMetric = dataFileReader.next(requestMetric);
            System.out.println(requestMetric);
        }

    }
}

We did write the Avro file and also we read from it. We don’t have to serialize our data into a file we can also do so in memory.

package com.egkatzioura.avro;

import com.egkatzioura.avro.model.RequestMetric;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.*;


public class InMemoryExample {

    public static void main(String[] args) throws IOException {
        RequestMetric a = new RequestMetric();
        a.setEndpoint("/a");
        a.setStatus(200);
        a.setCreatedAt(System.currentTimeMillis());

        RequestMetric b = new RequestMetric();
        b.setEndpoint("/b");
        b.setStatus(201);
        b.setCreatedAt(System.currentTimeMillis());

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
        SpecificDatumWriter<RequestMetric> datumWriter = new SpecificDatumWriter<>(RequestMetric.class);

        datumWriter.write(a, encoder);
        datumWriter.write(b, encoder);
        encoder.flush();

        outputStream.close();
        byte[] bytes = outputStream.toByteArray();

        DatumReader<RequestMetric> datumReader = new SpecificDatumReader<>(RequestMetric.class);

        datumReader.setSchema(a.getSchema());


        try(ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes)) {
            Decoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);

            while(true){
                try {
                    RequestMetric record = datumReader.read(null, decoder);
                    System.out.println(record);
                } catch (EOFException eof) {
                    break;
                }
            }
        }
    }

}

That’s all for now, we specified an Avro schema, generated the model and read and wrote Avro records.

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.