Avro Schema Generate and Use

If you implement streaming pipelines, chances are that you use Apache Avro.
Since Avro is a popular choice for serializing data it is widely supported by streaming tools and vendors. Also schema registries are available in order to help with the schema evolution.

Let’s run a basic Avro example.

It all starts with creating the schema on an avsc file. The goal would be to send request metrics for an http endpoint.

{
  "namespace": "com.egkatzioura.avro.model",
  "name": "RequestMetric",
  "type" : "record",
  "fields" : [
    {
      "name": "endpoint",
      "type" : ["null","string"],
      "default": null

    },
    {
      "name" : "status",
      "type" : ["null","int"],
      "default": null
    },
    {
      "name" : "error_message",
      "type" : ["null", "string"],
      "default": null
    },
    {
      "name" : "created_at",
      "type": "long",
      "logicalType" : "timestamp-millis"
    }
  ]
}

If the fields in a record are nullable we need to specify so in the schema ["null", "string"]. Also we want to sent timestamp thus we shall use a logicalType. A logicalType can be a complex or a primitive type, in our case it is a long. By using the attribute logicalType we provide additional semantic meaning to that type.

We will create the directory src/main/avro and place the file under the name request_metric.avsc.

Provided we use maven in order to generate the class files we need to have certain plugins included.


    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.11.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

As we can see we specified where the schemas are placed within the project by using the sourceDirectory configuration. By using the outputDirectory configuration we specify where the generated classes will be placed.

By running on maven mvn generate-sources the class RequestMetric will be generated.

Let’s create and read an avro file.

import com.egkatzioura.avro.model.RequestMetric;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.File;
import java.io.IOException;


public class Application {

    public static void main(String[] args) throws IOException {
        RequestMetric a = new RequestMetric();
        a.setEndpoint("/a");
        a.setStatus(200);
        a.setCreatedAt(System.currentTimeMillis());

        RequestMetric b = new RequestMetric();
        b.setEndpoint("/b");
        b.setStatus(201);
        b.setCreatedAt(System.currentTimeMillis());

        File file = new File("metric.avro");

        SpecificDatumWriter<RequestMetric> datumWriter = new SpecificDatumWriter<>(RequestMetric.class);

        try(DataFileWriter<RequestMetric> dataFileWriter = new DataFileWriter<>(datumWriter)) {
            dataFileWriter.create(a.getSchema(), file);
            dataFileWriter.append(a);
            dataFileWriter.append(b);
        }

        DatumReader<RequestMetric> datumReader = new SpecificDatumReader<>(RequestMetric.class);
        DataFileReader<RequestMetric> dataFileReader = new DataFileReader<>(file, datumReader);
        RequestMetric requestMetric= null;
        while (dataFileReader.hasNext()) {
            requestMetric = dataFileReader.next(requestMetric);
            System.out.println(requestMetric);
        }

    }
}

We did write the Avro file and also we read from it. We don’t have to serialize our data into a file we can also do so in memory.

package com.egkatzioura.avro;

import com.egkatzioura.avro.model.RequestMetric;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.*;


public class InMemoryExample {

    public static void main(String[] args) throws IOException {
        RequestMetric a = new RequestMetric();
        a.setEndpoint("/a");
        a.setStatus(200);
        a.setCreatedAt(System.currentTimeMillis());

        RequestMetric b = new RequestMetric();
        b.setEndpoint("/b");
        b.setStatus(201);
        b.setCreatedAt(System.currentTimeMillis());

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
        SpecificDatumWriter<RequestMetric> datumWriter = new SpecificDatumWriter<>(RequestMetric.class);

        datumWriter.write(a, encoder);
        datumWriter.write(b, encoder);
        encoder.flush();

        outputStream.close();
        byte[] bytes = outputStream.toByteArray();

        DatumReader<RequestMetric> datumReader = new SpecificDatumReader<>(RequestMetric.class);

        datumReader.setSchema(a.getSchema());


        try(ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes)) {
            Decoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);

            while(true){
                try {
                    RequestMetric record = datumReader.read(null, decoder);
                    System.out.println(record);
                } catch (EOFException eof) {
                    break;
                }
            }
        }
    }

}

That’s all for now, we specified an Avro schema, generated the model and read and wrote Avro records.

BigQuery Storage API: Get Started and Comparisons

BigQuery provides us with the Storage API for fast access using an rpc-based protocal. With this option you can receive the data in a binary serialized format. The alternative ways to retrieve BigQuery Data is through the Rest API and a Bulk export.

Retrieving data through the Rest API is great for small result sets. For example if a product of an aggregation is going to have limited amount of rows it makes sense to use the Rest API, retrieve the results and use them on an application like Grafana. However when it comes to big result sets retrieving results in json, serializing and storing them, has an extra overhead. Exporting in Binary formats help you avoid this overhead.

Bulk Data export is a good solution to export big result sets however you are limited to where the data are getting stored (Google Cloud Storage), and some daily limits on exports.

Thus the storage API combines the flexibility of using a rpc protocol, the efficiency of downloading big results sets in a binary format and the flexibility to choose where those data shall be stored.

The storage API provides two ways to stream Data, either through Avro or through Arrow.

When using the Storage API first step is to create a Session. The format (Avro/Arrow) should be specified. This session can have more than one Streams, max number of streams can be specified.
Streams will contain the data in the format specified and can be read in parallel. The session expires on its own with no need for handling.

If a Session request is successful then it shall contain the schema of the data and the streams to use to download the data.

For the following example we assume the table, that we read data from has two columns, col1 is a string and col2 is a number. An Arrow example of this schema can be found here.

In order to test the storage api you need an account on GCP with the BigQuery Storage API enabled and a dataset created.

Let’s continue to the Arrow example.