Avro Schema Generate and Use

If you implement streaming pipelines, chances are that you use Apache Avro.
Since Avro is a popular choice for serializing data it is widely supported by streaming tools and vendors. Also schema registries are available in order to help with the schema evolution.

Let’s run a basic Avro example.

It all starts with creating the schema on an avsc file. The goal would be to send request metrics for an http endpoint.

{
  "namespace": "com.egkatzioura.avro.model",
  "name": "RequestMetric",
  "type" : "record",
  "fields" : [
    {
      "name": "endpoint",
      "type" : ["null","string"],
      "default": null

    },
    {
      "name" : "status",
      "type" : ["null","int"],
      "default": null
    },
    {
      "name" : "error_message",
      "type" : ["null", "string"],
      "default": null
    },
    {
      "name" : "created_at",
      "type": "long",
      "logicalType" : "timestamp-millis"
    }
  ]
}

If the fields in a record are nullable we need to specify so in the schema ["null", "string"]. Also we want to sent timestamp thus we shall use a logicalType. A logicalType can be a complex or a primitive type, in our case it is a long. By using the attribute logicalType we provide additional semantic meaning to that type.

We will create the directory src/main/avro and place the file under the name request_metric.avsc.

Provided we use maven in order to generate the class files we need to have certain plugins included.


    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.11.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

As we can see we specified where the schemas are placed within the project by using the sourceDirectory configuration. By using the outputDirectory configuration we specify where the generated classes will be placed.

By running on maven mvn generate-sources the class RequestMetric will be generated.

Let’s create and read an avro file.

import com.egkatzioura.avro.model.RequestMetric;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.File;
import java.io.IOException;


public class Application {

    public static void main(String[] args) throws IOException {
        RequestMetric a = new RequestMetric();
        a.setEndpoint("/a");
        a.setStatus(200);
        a.setCreatedAt(System.currentTimeMillis());

        RequestMetric b = new RequestMetric();
        b.setEndpoint("/b");
        b.setStatus(201);
        b.setCreatedAt(System.currentTimeMillis());

        File file = new File("metric.avro");

        SpecificDatumWriter<RequestMetric> datumWriter = new SpecificDatumWriter<>(RequestMetric.class);

        try(DataFileWriter<RequestMetric> dataFileWriter = new DataFileWriter<>(datumWriter)) {
            dataFileWriter.create(a.getSchema(), file);
            dataFileWriter.append(a);
            dataFileWriter.append(b);
        }

        DatumReader<RequestMetric> datumReader = new SpecificDatumReader<>(RequestMetric.class);
        DataFileReader<RequestMetric> dataFileReader = new DataFileReader<>(file, datumReader);
        RequestMetric requestMetric= null;
        while (dataFileReader.hasNext()) {
            requestMetric = dataFileReader.next(requestMetric);
            System.out.println(requestMetric);
        }

    }
}

We did write the Avro file and also we read from it. We don’t have to serialize our data into a file we can also do so in memory.

package com.egkatzioura.avro;

import com.egkatzioura.avro.model.RequestMetric;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.*;


public class InMemoryExample {

    public static void main(String[] args) throws IOException {
        RequestMetric a = new RequestMetric();
        a.setEndpoint("/a");
        a.setStatus(200);
        a.setCreatedAt(System.currentTimeMillis());

        RequestMetric b = new RequestMetric();
        b.setEndpoint("/b");
        b.setStatus(201);
        b.setCreatedAt(System.currentTimeMillis());

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
        SpecificDatumWriter<RequestMetric> datumWriter = new SpecificDatumWriter<>(RequestMetric.class);

        datumWriter.write(a, encoder);
        datumWriter.write(b, encoder);
        encoder.flush();

        outputStream.close();
        byte[] bytes = outputStream.toByteArray();

        DatumReader<RequestMetric> datumReader = new SpecificDatumReader<>(RequestMetric.class);

        datumReader.setSchema(a.getSchema());


        try(ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes)) {
            Decoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);

            while(true){
                try {
                    RequestMetric record = datumReader.read(null, decoder);
                    System.out.println(record);
                } catch (EOFException eof) {
                    break;
                }
            }
        }
    }

}

That’s all for now, we specified an Avro schema, generated the model and read and wrote Avro records.

BigQuery Storage API: Avro

Previously we had an introduction on the BigQuery Storage API and we proceeded reading data using the Arrow format.
In this tutorial we shall read Data using the Avro format.

What applied on the previous tutorial applies here too.

We shall create a BigQuery Storage Client, create a ReadSession using the Avro format and iterate the data on each stream.

Let’s get started by importing the dependencies, we do import the Avro library needed.

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>libraries-bom</artifactId>
                <version>20.5.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquerystorage</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>
    </dependencies>

Our next step would be to create an Avro Data Reader for our rows that have the schema of col1:string, col2:int. In our case we shall just print the data through sys.out

package com.gkatzioura.bigquery.storage.api.avro;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;

import com.google.cloud.bigquery.storage.v1.AvroSchema;
import com.google.protobuf.ByteString;

public class AvroReader {

    private final GenericDatumReader<GenericRecord> datumReader;

    public AvroReader(AvroSchema arrowSchema) {
        Schema schema = new Schema.Parser().parse(arrowSchema.getSchema());
        this.datumReader = new GenericDatumReader<>(schema);
    }

    public void processRows(ByteString avroRows) throws IOException {
        try(InputStream inputStream = new ByteArrayInputStream(avroRows.toByteArray())) {
            BinaryDecoder decoder =DecoderFactory.get().binaryDecoder(inputStream, null);

            while (!decoder.isEnd()) {
                GenericRecord item = datumReader.read(null, decoder);

                System.out.println(item.get("col1")+","+item.get("col2"));
            }
        }
    }

}

Then on to our main class which is the one with any BigQuery logic needed.

package com.gkatzioura.bigquery.storage.api.avro;

import org.apache.arrow.util.Preconditions;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;

public class AvroMain {

    public static void main(String[] args) throws Exception {

        String projectId = System.getenv("PROJECT_ID");

        try (BigQueryReadClient client = BigQueryReadClient.create()) {
            String parent = String.format("projects/%s", projectId);

            String srcTable =
                    String.format(
                            "projects/%s/datasets/%s/tables/%s",
                            projectId, System.getenv("DATASET"), System.getenv("TABLE"));

            ReadSession.Builder sessionBuilder =
                    ReadSession.newBuilder()
                               .setTable(srcTable)
                               .setDataFormat(DataFormat.AVRO);


            CreateReadSessionRequest.Builder builder =
                    CreateReadSessionRequest.newBuilder()
                                            .setParent(parent)
                                            .setReadSession(sessionBuilder)
                                            .setMaxStreamCount(1);
            ReadSession session = client.createReadSession(builder.build());

            Preconditions.checkState(session.getStreamsCount() > 0);

            String streamName = session.getStreams(0).getName();

            ReadRowsRequest readRowsRequest =
                    ReadRowsRequest.newBuilder().setReadStream(streamName).build();

            ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);

            for (ReadRowsResponse response : stream) {
                new AvroReader(session.getAvroSchema()).processRows(response.getAvroRows().getSerializedBinaryRows());
            }
        }
    }

}

A BigQuery client is created. Then we create a session request with a max number of streams. We did specify that the format to be used will be Avro.
Once we get a Response, the response will contain the initiated the Session, the Avro schema and the streams that we shall use to retrieve the Data.
For each stream there has to be a ReadRowsRequest in order to fetch the data.
Then we pass data to our Avro decoder.

That’s it we just read data from the BigQuery Storage API using Avro and Arrow!

BigQuery Storage API: Arrow

Previously we had an introduction on the BigQuery Storage API. As explained the storage API of BigQuery supports two formats. For this tutorial we will choose the Arrow Format.

First let’s import the dependencies. The BigQuery storage API binary does not come with a library to parse Arrow. This way the consumer receives the binaries in an Arrow format, and it’s up to the consumer on how to consume the binaries and what libraries to use.


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>libraries-bom</artifactId>
                <version>20.5.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquerystorage</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-vector</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-memory-netty</artifactId>
            <version>4.0.0</version>
        </dependency>
    </dependencies>

As mentioned before, when we use Arrow we need to import a library for the memory allocation Arrow needs.

We shall create first a plain Arrow Reader.
This Reader will be BigQuery agnostic. This is one of the benefits when we use a platform-language independent format.

An Arrow Binary shall be submitted to the reader with the schema and the rows shall be printed in CSV format.

package com.gkatzioura.bigquery.storage.api.arrow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;

public class ArrowReader implements AutoCloseable {

    private final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

    private final VectorSchemaRoot root;
    private final VectorLoader loader;

    public ArrowReader(ArrowSchema arrowSchema) throws IOException {
        Schema schema =
                MessageSerializer.deserializeSchema(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        arrowSchema.getSerializedSchema().toByteArray())));

        Preconditions.checkNotNull(schema);
        List<FieldVector> vectors = new ArrayList<>();
        for (Field field : schema.getFields()) {
            vectors.add(field.createVector(allocator));
        }

        root = new VectorSchemaRoot(vectors);
        loader = new VectorLoader(root);
    }

    public void processRows(ArrowRecordBatch batch) throws IOException {
        org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
                MessageSerializer.deserializeRecordBatch(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        batch.getSerializedRecordBatch().toByteArray())),
                        allocator);

        loader.load(deserializedBatch);
        deserializedBatch.close();
        System.out.println(root.contentToTSVString());
        root.clear();
    }

    @Override
    public void close() throws Exception {
        root.close();
        allocator.close();
    }

}

The constructor will have the schema injected, then the schema root shall be created.
Pay attention that we receive the schema in a binary form, it’s up to us and our library on how to read it.


        Schema schema =
                MessageSerializer.deserializeSchema(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        arrowSchema.getSerializedSchema().toByteArray())));

You can find more on reading Arrow data on this tutorial.

Then on to our main class which is the one with any BigQuery logic needed.

package com.gkatzioura.bigquery.storage.api.arrow;

import org.apache.arrow.util.Preconditions;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;

public class ArrowMain {

    public static void main(String[] args) throws Exception {

        String projectId = System.getenv("PROJECT_ID");

        try (BigQueryReadClient client = BigQueryReadClient.create()) {
            String parent = String.format("projects/%s", projectId);

            String srcTable =
                    String.format(
                            "projects/%s/datasets/%s/tables/%s",
                            projectId, System.getenv("DATASET"), System.getenv("TABLE"));

            ReadSession.Builder sessionBuilder =
                    ReadSession.newBuilder()
                               .setTable(srcTable)
                               .setDataFormat(DataFormat.ARROW);

            CreateReadSessionRequest.Builder builder =
                    CreateReadSessionRequest.newBuilder()
                                            .setParent(parent)
                                            .setReadSession(sessionBuilder)
                                            .setMaxStreamCount(1);
            ReadSession session = client.createReadSession(builder.build());

            try (ArrowReader reader = new ArrowReader(session.getArrowSchema())) {
                Preconditions.checkState(session.getStreamsCount() > 0);

                String streamName = session.getStreams(0).getName();

                ReadRowsRequest readRowsRequest =
                        ReadRowsRequest.newBuilder().setReadStream(streamName).build();

                ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
                for (ReadRowsResponse response : stream) {
                    Preconditions.checkState(response.hasArrowRecordBatch());
                    reader.processRows(response.getArrowRecordBatch());
                }
            }
        }
    }

}

A BigQuery client is created. Then we create a session request with a max number of streams. We do have to specify that the format to be used will be Arrow.
Once we get a Response, the response will contain the initiated the Session, the Arrow schema and the streams that we shall use to retrieve the Data.
For each stream there has to be a ReadRowsRequest in order to fetch the data.
Our next example will focus on reading data in Avro format.

BigQuery Storage API: Get Started and Comparisons

BigQuery provides us with the Storage API for fast access using an rpc-based protocal. With this option you can receive the data in a binary serialized format. The alternative ways to retrieve BigQuery Data is through the Rest API and a Bulk export.

Retrieving data through the Rest API is great for small result sets. For example if a product of an aggregation is going to have limited amount of rows it makes sense to use the Rest API, retrieve the results and use them on an application like Grafana. However when it comes to big result sets retrieving results in json, serializing and storing them, has an extra overhead. Exporting in Binary formats help you avoid this overhead.

Bulk Data export is a good solution to export big result sets however you are limited to where the data are getting stored (Google Cloud Storage), and some daily limits on exports.

Thus the storage API combines the flexibility of using a rpc protocol, the efficiency of downloading big results sets in a binary format and the flexibility to choose where those data shall be stored.

The storage API provides two ways to stream Data, either through Avro or through Arrow.

When using the Storage API first step is to create a Session. The format (Avro/Arrow) should be specified. This session can have more than one Streams, max number of streams can be specified.
Streams will contain the data in the format specified and can be read in parallel. The session expires on its own with no need for handling.

If a Session request is successful then it shall contain the schema of the data and the streams to use to download the data.

For the following example we assume the table, that we read data from has two columns, col1 is a string and col2 is a number. An Arrow example of this schema can be found here.

In order to test the storage api you need an account on GCP with the BigQuery Storage API enabled and a dataset created.

Let’s continue to the Arrow example.

Apache Arrow on the JVM: Streaming Reads

Previously we wrote Arrow Data to a Stream. Now we shall read those data from a stream.

Just like on the previous blog the we shall implement the Closeable interface. This is needed to close the RootAllocator and free-up memory.

We shall pass a ReadableByteChannel and thus get the stream into read objects. 

package com.gkatzioura.arrow;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;

public class DefaultEntriesReader implements Closeable {

    private final RootAllocator rootAllocator;

    public DefaultEntriesReader() {
        rootAllocator = new RootAllocator(Integer.MAX_VALUE);
    }

    public List<DefaultArrowEntry> readBytes(ReadableByteChannel readableByteChannel) throws IOException {
        List<DefaultArrowEntry> defaultArrowEntries = new ArrayList<>();

        try(ArrowStreamReader arrowStreamReader = new ArrowStreamReader(readableByteChannel, rootAllocator)) {
            var root = arrowStreamReader.getVectorSchemaRoot();

            var childVector1 = (VarCharVector)root.getVector(0);
            var childVector2 = (IntVector)root.getVector(1);

            while (arrowStreamReader.loadNextBatch()) {

                int batchSize = root.getRowCount();

                for (int i = 0; i < batchSize; i++) {
                    var strData = new String(childVector1.get(i));
                    var intData = childVector2.get(i);

                    DefaultArrowEntry defaultArrowEntry = DefaultArrowEntry.builder().col1(strData).col2(intData).build();
                    defaultArrowEntries.add(defaultArrowEntry);
                }
            }

            return defaultArrowEntries;
        }
    }

    @Override
    public void close() throws IOException {
        rootAllocator.close();
    }
}

Let’s wrap it up with a write and a Read

package com.gkatzioura.arrow;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ArrowMain {

    public static void main(String[] args) throws IOException {
        var originalEntries = IntStream.rangeClosed(0, 11)
                             .boxed()
                             .map(i -> new DefaultArrowEntry("data-"+i, i)).collect(Collectors.toList());

        var outputStream = new ByteArrayOutputStream();

        try(var arrowWriter = new DefaultEntriesWriter()) {
            arrowWriter.write(originalEntries, 10, Channels.newChannel(outputStream));
        }

        byte[] introBytes = outputStream.toByteArray();

        var inputStream = new ByteArrayInputStream(introBytes);

        try(var arrowReader = new DefaultEntriesReader()) {
            var entries =arrowReader.readBytes(Channels.newChannel(inputStream));
            for (DefaultArrowEntry entry : entries) {
                System.out.println("Read "+entry.getCol1()+" "+entry.getCol2());
            }
        }

    }

}

That’s it. To summarise we created Arrow Schemas, we wrote data to a Stream and we read data from a Stream!

Apache Arrow on the JVM: Streaming Writes

Previously we went to create some schemas on Arrow.  On this blog we will have a look on writing through streaming API.

Based on the previous post’s Schema we shall create a DTO for our classes.

package com.gkatzioura.arrow;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class DefaultArrowEntry {

    private String col1;
    private Integer col2;

}

Our goal would be to transform those Java objects into a Stream of Arrow bytes.

The allocator creates DirectByteBuffer‘s.
Those buffers are off-heap. You do need to free up the memory used, but for the library user this is done by executing the close() operation on the allocator. In our case our class will implement the Closeable interface which shall do the allocator close operation.

By using the stream api, the data will be streamed to the OutPutStream submitted using the Arrow format.

package com.gkatzioura.arrow;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.util.Text;

import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA;

public class DefaultEntriesWriter implements Closeable {

    private final RootAllocator rootAllocator;
    private final VectorSchemaRoot vectorSchemaRoot;

    public DefaultEntriesWriter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

    public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) {
        if (batchSize <= 0) {
            batchSize = defaultArrowEntries.size();
        }

        DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();
        try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
            writer.start();

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

            boolean exactBatches = defaultArrowEntries.size()%batchSize == 0;
            int batchCounter = 0;

            for(int i=0; i < defaultArrowEntries.size(); i++) {
                childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2());

                batchCounter++;

                if(batchCounter == batchSize) {
                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();
                    batchCounter = 0;
                }
            }

            if(!exactBatches) {
                vectorSchemaRoot.setRowCount(batchCounter);
                writer.writeBatch();
            }

            writer.end();
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

}

To display the support of batches on Arrow a simple batch algorithm has been implemented within the function. For our example just take into account that data will be written in batches.

Let’s dive into the function.

The vector allocator discussed previously is created

    public DefaultEntriesToBytesConverter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

Then when writing to a stream, an arrow stream writer is implemented and started

ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out));
writer.start();

We shall use the vectors in order to populated them with the data. Also reset them but let the pre-alocated buffers to exist

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

We use the setSafe operation when writing data. This way if more buffer needs to be allocated shall be done. For this example it’s done on every write, but can be avoided when the operations and the buffer size needed is taken into account.

                childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

Then we write the batch to the stream.


                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();

Last but not least we close the writer.

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

The next blog will focus on reading Arrow Data from a stream.

Apache Arrow on the JVM: Get Started and Schemas

Arrow is memory format for flat and hierarchical data. It is a popular format used by various big data tools, among them BigQuery. One of the benefits that Arrow brings is that the format of the data has the same byte representation on the languages supported. So apart from the benefits of a columnar memory format there are also the benefits of zero-copy without the serialization overhead.

Apache Arrow defines a language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware like CPUs and GPUs. The Arrow memory format also supports zero-copy reads for lightning-fast data access without serialization overhead. more

Let’s import the libraries

        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-memory-netty</artifactId>
            <version>${arrow.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-vector</artifactId>
            <version>${arrow.version}</version>
        </dependency>

Before starting it is essential to understand that for Read/Write operations on Arrow, byte buffers are used. Operations like reading and writing is continuous exchange of bytes. To make this efficient Arrow comes with a buffer allocator, which can have a certain size or have an automatic expansion.
The libraries backing the allocation management is arrow-memory-netty and arrow-memory-unsafe. We shall use the netty one.

Storing Data in arrow requires a schema. Schemas can be defined programatically

package com.gkatzioura.arrow;

import java.io.IOException;
import java.util.List;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

public class SchemaFactory {

    public static Schema DEFAULT_SCHEMA = createDefault();

    public static Schema createDefault() {
        var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);
        var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);

        return new Schema(List.of(strField, intField));
    }

    public static Schema schemaWithChildren() {
        var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
        var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
        var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

        return new Schema(List.of(itemField));
    }

    public static Schema fromJson(String jsonString) {
        try {
            return Schema.fromJSON(jsonString);
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

}

Also they have a parseable json representation.

{
  "fields" : [ {
    "name" : "col1",
    "nullable" : true,
    "type" : {
      "name" : "utf8"
    },
    "children" : [ ]
  }, {
    "name" : "col2",
    "nullable" : true,
    "type" : {
      "name" : "int",
      "bitWidth" : 32,
      "isSigned" : true
    },
    "children" : [ ]
  } ]
}

Plus just like Avro you can have complex schemas and embedded values on a field.

    public static Schema schemaWithChildren() {
        var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
        var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
        var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

        return new Schema(List.of(itemField));
    }

On the next blog, we shall use the Streaming API for Arrow

A guide to the InfluxDBMapper and QueryBuilder for Java Part: 1

With the release of latest influxdb-java driver version came along the InfluxbMapper.

To get started we need to spin up an influxdb instance, and docker is the easiest way to do so. We just follow the steps as described here.

Now we have a database with some data and we are ready to execute our queries.

We have the measure h2o_feet

> SELECT * FROM "h2o_feet"

name: h2o_feet
--------------
time                   level description      location       water_level
2015-08-18T00:00:00Z   below 3 feet           santa_monica   2.064
2015-08-18T00:00:00Z   between 6 and 9 feet   coyote_creek   8.12
[...]
2015-09-18T21:36:00Z   between 3 and 6 feet   santa_monica   5.066
2015-09-18T21:42:00Z   between 3 and 6 feet   santa_monica   4.938

So we shall create a model for that.

package com.gkatzioura.mapper.showcase;

import java.time.Instant;
import java.util.concurrent.TimeUnit;

import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

@Measurement(name = "h2o_feet", database = "NOAA_water_database", timeUnit = TimeUnit.SECONDS)
public class H2OFeetMeasurement {

    @Column(name = "time")
    private Instant time;

    @Column(name = "level description")
    private String levelDescription;

    @Column(name = "location")
    private String location;

    @Column(name = "water_level")
    private Double waterLevel;

    public Instant getTime() {
        return time;
    }

    public void setTime(Instant time) {
        this.time = time;
    }

    public String getLevelDescription() {
        return levelDescription;
    }

    public void setLevelDescription(String levelDescription) {
        this.levelDescription = levelDescription;
    }

    public String getLocation() {
        return location;
    }

    public void setLocation(String location) {
        this.location = location;
    }

    public Double getWaterLevel() {
        return waterLevel;
    }

    public void setWaterLevel(Double waterLevel) {
        this.waterLevel = waterLevel;
    }
}

And the we shall fetch all the entries of the h2o_feet measurement.

package com.gkatzioura.mapper.showcase;

import java.util.List;
import java.util.logging.Logger;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.impl.InfluxDBImpl;
import org.influxdb.impl.InfluxDBMapper;

public class InfluxDBMapperShowcase {

    private static final Logger LOGGER = Logger.getLogger(InfluxDBMapperShowcase.class.getName());

    public static void main(String[] args) {

        InfluxDB influxDB = InfluxDBFactory.connect("http://localhost:8086", "root", "root");

        InfluxDBMapper influxDBMapper = new InfluxDBMapper(influxDB);
        List h2OFeetMeasurements = influxDBMapper.query(H2OFeetMeasurement.class);

    }
}

After being successful on fetching the data we will continue with persisting data.


        H2OFeetMeasurement h2OFeetMeasurement = new H2OFeetMeasurement();
        h2OFeetMeasurement.setTime(Instant.now());
        h2OFeetMeasurement.setLevelDescription("Just a test");
        h2OFeetMeasurement.setLocation("London");
        h2OFeetMeasurement.setWaterLevel(1.4d);

        influxDBMapper.save(h2OFeetMeasurement);

        List measurements = influxDBMapper.query(H2OFeetMeasurement.class);

        H2OFeetMeasurement h2OFeetMeasurement1 = measurements.get(measurements.size()-1);
        assert h2OFeetMeasurement1.getLevelDescription().equals("Just a test");

Apparently fetching all the measurements to get the last entry is not the most efficient thing to do. In the upcoming tutorials we are going to see how we use the InfluxDBMapper with advanced InfluxDB queries.

Run WordCount with Scala and Spark on HDInsight

Previously we tried to solve the word count problem with a Scala and Spark approach.
The next step is to deploy our solution to HDInsight using spark, hdfs, and scala

We shall provision a Sprak cluster.

screenshot-from-2017-02-22-23-12-22

Since we are going to use HDInsight we can utilize hdfs and therefore use the azure storage.

screenshot-from-2017-02-22-23-12-59

Then we choose our instance types.

screenshot-from-2017-02-22-23-13-21

And we are ready to create the Spark cluster.

screenshot-from-2017-02-22-23-13-55

Our data shall be uploaded to the hdfs file system
To do so we will upload our text files to the azure storage account which is integrated with hdfs.

For more information on managing a storage account with azure cli check the official guide. Any text file will work.

azure storage blob upload mytextfile.txt sparkclusterscala  example/data/mytextfile.txt

Since we use hdfs we shall make some changes to the original script

val text = sc.textFile("wasb:///example/data/mytextfile.txt")
val counts = text.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.collect

Then we can upload our scala class to the head node using ssh

scp WordCountscala.scala demon@{your cluster}-ssh.azurehdinsight.net:/home/demo/WordCountscala.scala

Again in order to run the script, things are pretty straightforward.

spark-shell -i WordCountscala.scala

And once the task is done we are presented with the spark prompt. Plus we can now save our results to the hdfs file system.

scala> counts.saveAsTextFile("/wordcount_results")

And do a quick check.

hdfs dfs -ls /wordcount_results/
hdfs dfs -text /wordcount_results/part-00000

WordCount with Sprak and Scala

Apache Spark has taken over the big data world. Spark is implemented with Scala and is well know for its performance.

In the previous blogs we approached the word count problem by using Scala with hadoop and Scala with storm.
On this blog we will utilize Spark for the word count problem.

Submitting spark jobs implemented with Scala is pretty easy and convenient. All we need is to submit our file as our input to the spark command.

First we have to download and setup a spark version locally.

Then will shall download a text file for testing. In my case the script from MGS2 did the work.

Now on to the WordCount script. For local testing we will use a file from our file system.

val text = sc.textFile("mytextfile.txt")
val counts = text.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.collect

Next step is to run the script

spark-shell -i WordCountscala.scala

Once finished a Spark command prompt will appear and we are free to do some experiments with the word count results

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

scala> res0.length
res1: Int = 20159

Thus we detected 20159 different words.

Our next step is to run our job to a spark cluster on HDInsight.