Apache Arrow on the JVM: Streaming Writes

Previously we went to create some schemas on Arrow.  On this blog we will have a look on writing through streaming API.

Based on the previous post’s Schema we shall create a DTO for our classes.

package com.gkatzioura.arrow;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class DefaultArrowEntry {

    private String col1;
    private Integer col2;

}

Our goal would be to transform those Java objects into a Stream of Arrow bytes.

The allocator creates DirectByteBuffer‘s.
Those buffers are off-heap. You do need to free up the memory used, but for the library user this is done by executing the close() operation on the allocator. In our case our class will implement the Closeable interface which shall do the allocator close operation.

By using the stream api, the data will be streamed to the OutPutStream submitted using the Arrow format.

package com.gkatzioura.arrow;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.util.Text;

import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA;

public class DefaultEntriesWriter implements Closeable {

    private final RootAllocator rootAllocator;
    private final VectorSchemaRoot vectorSchemaRoot;

    public DefaultEntriesWriter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

    public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) {
        if (batchSize <= 0) {
            batchSize = defaultArrowEntries.size();
        }

        DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();
        try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
            writer.start();

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

            boolean exactBatches = defaultArrowEntries.size()%batchSize == 0;
            int batchCounter = 0;

            for(int i=0; i < defaultArrowEntries.size(); i++) {
                childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2());

                batchCounter++;

                if(batchCounter == batchSize) {
                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();
                    batchCounter = 0;
                }
            }

            if(!exactBatches) {
                vectorSchemaRoot.setRowCount(batchCounter);
                writer.writeBatch();
            }

            writer.end();
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

}

To display the support of batches on Arrow a simple batch algorithm has been implemented within the function. For our example just take into account that data will be written in batches.

Let’s dive into the function.

The vector allocator discussed previously is created

    public DefaultEntriesToBytesConverter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

Then when writing to a stream, an arrow stream writer is implemented and started

ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out));
writer.start();

We shall use the vectors in order to populated them with the data. Also reset them but let the pre-alocated buffers to exist

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

We use the setSafe operation when writing data. This way if more buffer needs to be allocated shall be done. For this example it’s done on every write, but can be avoided when the operations and the buffer size needed is taken into account.

                childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

Then we write the batch to the stream.


                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();

Last but not least we close the writer.

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

The next blog will focus on reading Arrow Data from a stream.

Apache Arrow on the JVM: Get Started and Schemas

Arrow is memory format for flat and hierarchical data. It is a popular format used by various big data tools, among them BigQuery. One of the benefits that Arrow brings is that the format of the data has the same byte representation on the languages supported. So apart from the benefits of a columnar memory format there are also the benefits of zero-copy without the serialization overhead.

Apache Arrow defines a language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware like CPUs and GPUs. The Arrow memory format also supports zero-copy reads for lightning-fast data access without serialization overhead. more

Let’s import the libraries

        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-memory-netty</artifactId>
            <version>${arrow.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-vector</artifactId>
            <version>${arrow.version}</version>
        </dependency>

Before starting it is essential to understand that for Read/Write operations on Arrow, byte buffers are used. Operations like reading and writing is continuous exchange of bytes. To make this efficient Arrow comes with a buffer allocator, which can have a certain size or have an automatic expansion.
The libraries backing the allocation management is arrow-memory-netty and arrow-memory-unsafe. We shall use the netty one.

Storing Data in arrow requires a schema. Schemas can be defined programatically

package com.gkatzioura.arrow;

import java.io.IOException;
import java.util.List;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

public class SchemaFactory {

    public static Schema DEFAULT_SCHEMA = createDefault();

    public static Schema createDefault() {
        var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);
        var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);

        return new Schema(List.of(strField, intField));
    }

    public static Schema schemaWithChildren() {
        var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
        var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
        var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

        return new Schema(List.of(itemField));
    }

    public static Schema fromJson(String jsonString) {
        try {
            return Schema.fromJSON(jsonString);
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

}

Also they have a parseable json representation.

{
  "fields" : [ {
    "name" : "col1",
    "nullable" : true,
    "type" : {
      "name" : "utf8"
    },
    "children" : [ ]
  }, {
    "name" : "col2",
    "nullable" : true,
    "type" : {
      "name" : "int",
      "bitWidth" : 32,
      "isSigned" : true
    },
    "children" : [ ]
  } ]
}

Plus just like Avro you can have complex schemas and embedded values on a field.

    public static Schema schemaWithChildren() {
        var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
        var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
        var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

        return new Schema(List.of(itemField));
    }

On the next blog, we shall use the Streaming API for Arrow