BigQuery Storage API: Arrow

Previously we had an introduction on the BigQuery Storage API. As explained the storage API of BigQuery supports two formats. For this tutorial we will choose the Arrow Format.

First let’s import the dependencies. The BigQuery storage API binary does not come with a library to parse Arrow. This way the consumer receives the binaries in an Arrow format, and it’s up to the consumer on how to consume the binaries and what libraries to use.


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>libraries-bom</artifactId>
                <version>20.5.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquerystorage</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-vector</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-memory-netty</artifactId>
            <version>4.0.0</version>
        </dependency>
    </dependencies>

As mentioned before, when we use Arrow we need to import a library for the memory allocation Arrow needs.

We shall create first a plain Arrow Reader.
This Reader will be BigQuery agnostic. This is one of the benefits when we use a platform-language independent format.

An Arrow Binary shall be submitted to the reader with the schema and the rows shall be printed in CSV format.

package com.gkatzioura.bigquery.storage.api.arrow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;

public class ArrowReader implements AutoCloseable {

    private final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

    private final VectorSchemaRoot root;
    private final VectorLoader loader;

    public ArrowReader(ArrowSchema arrowSchema) throws IOException {
        Schema schema =
                MessageSerializer.deserializeSchema(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        arrowSchema.getSerializedSchema().toByteArray())));

        Preconditions.checkNotNull(schema);
        List<FieldVector> vectors = new ArrayList<>();
        for (Field field : schema.getFields()) {
            vectors.add(field.createVector(allocator));
        }

        root = new VectorSchemaRoot(vectors);
        loader = new VectorLoader(root);
    }

    public void processRows(ArrowRecordBatch batch) throws IOException {
        org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
                MessageSerializer.deserializeRecordBatch(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        batch.getSerializedRecordBatch().toByteArray())),
                        allocator);

        loader.load(deserializedBatch);
        deserializedBatch.close();
        System.out.println(root.contentToTSVString());
        root.clear();
    }

    @Override
    public void close() throws Exception {
        root.close();
        allocator.close();
    }

}

The constructor will have the schema injected, then the schema root shall be created.
Pay attention that we receive the schema in a binary form, it’s up to us and our library on how to read it.


        Schema schema =
                MessageSerializer.deserializeSchema(
                        new ReadChannel(
                                new ByteArrayReadableSeekableByteChannel(
                                        arrowSchema.getSerializedSchema().toByteArray())));

You can find more on reading Arrow data on this tutorial.

Then on to our main class which is the one with any BigQuery logic needed.

package com.gkatzioura.bigquery.storage.api.arrow;

import org.apache.arrow.util.Preconditions;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;

public class ArrowMain {

    public static void main(String[] args) throws Exception {

        String projectId = System.getenv("PROJECT_ID");

        try (BigQueryReadClient client = BigQueryReadClient.create()) {
            String parent = String.format("projects/%s", projectId);

            String srcTable =
                    String.format(
                            "projects/%s/datasets/%s/tables/%s",
                            projectId, System.getenv("DATASET"), System.getenv("TABLE"));

            ReadSession.Builder sessionBuilder =
                    ReadSession.newBuilder()
                               .setTable(srcTable)
                               .setDataFormat(DataFormat.ARROW);

            CreateReadSessionRequest.Builder builder =
                    CreateReadSessionRequest.newBuilder()
                                            .setParent(parent)
                                            .setReadSession(sessionBuilder)
                                            .setMaxStreamCount(1);
            ReadSession session = client.createReadSession(builder.build());

            try (ArrowReader reader = new ArrowReader(session.getArrowSchema())) {
                Preconditions.checkState(session.getStreamsCount() > 0);

                String streamName = session.getStreams(0).getName();

                ReadRowsRequest readRowsRequest =
                        ReadRowsRequest.newBuilder().setReadStream(streamName).build();

                ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
                for (ReadRowsResponse response : stream) {
                    Preconditions.checkState(response.hasArrowRecordBatch());
                    reader.processRows(response.getArrowRecordBatch());
                }
            }
        }
    }

}

A BigQuery client is created. Then we create a session request with a max number of streams. We do have to specify that the format to be used will be Arrow.
Once we get a Response, the response will contain the initiated the Session, the Arrow schema and the streams that we shall use to retrieve the Data.
For each stream there has to be a ReadRowsRequest in order to fetch the data.
Our next example will focus on reading data in Avro format.

BigQuery Storage API: Get Started and Comparisons

BigQuery provides us with the Storage API for fast access using an rpc-based protocal. With this option you can receive the data in a binary serialized format. The alternative ways to retrieve BigQuery Data is through the Rest API and a Bulk export.

Retrieving data through the Rest API is great for small result sets. For example if a product of an aggregation is going to have limited amount of rows it makes sense to use the Rest API, retrieve the results and use them on an application like Grafana. However when it comes to big result sets retrieving results in json, serializing and storing them, has an extra overhead. Exporting in Binary formats help you avoid this overhead.

Bulk Data export is a good solution to export big result sets however you are limited to where the data are getting stored (Google Cloud Storage), and some daily limits on exports.

Thus the storage API combines the flexibility of using a rpc protocol, the efficiency of downloading big results sets in a binary format and the flexibility to choose where those data shall be stored.

The storage API provides two ways to stream Data, either through Avro or through Arrow.

When using the Storage API first step is to create a Session. The format (Avro/Arrow) should be specified. This session can have more than one Streams, max number of streams can be specified.
Streams will contain the data in the format specified and can be read in parallel. The session expires on its own with no need for handling.

If a Session request is successful then it shall contain the schema of the data and the streams to use to download the data.

For the following example we assume the table, that we read data from has two columns, col1 is a string and col2 is a number. An Arrow example of this schema can be found here.

In order to test the storage api you need an account on GCP with the BigQuery Storage API enabled and a dataset created.

Let’s continue to the Arrow example.

Locking for multiple nodes the easy way: GCS

It happens to all of us. We develop stateless applications that can scale horizontally without any effort.
However sometimes cases arise where you need to achieve some type of coordination.

You can go really advanced on this one. For example you can use a framework like Akka and it’s cluster capabilities. Or you can go really simple like rolling a mechanism on your own as long as it gives you the results needed. On another note you can just have different node groups based on the work you need them to do. The options and the solutions can change based on the problem.

If your problem can go with a simple option, one way to do so , provided you use Google Cloud Storage, is to use its lock capabilities.
Imagine for example a scenario of 4 nodes, they do scale dynamically but each time a new node registers you want to change its actions by acquiring a unique configuration which does not collide with a configuration another node might have received.

The strategy can be to use a file on Google Cloud Storage for locking and a file that acts as a centralised configuration registry.

The lock file is nothing more that a file on cloud storage which shall be created and deleted. What will give us lock abilities is the option on GCS to create a file only if it not exists.
Thus a process from one node will try to create the `lock` file, this action would be equivalent to obtaining the lock.
Once the process is done will delete the file, this action would be equivalent to releasing the lock.
Other processes in the meantime will try to create the file (acquire the lock) and fail (file already exists) because other processes have created the file.
Meanwhile the process that has successfully created the file (acquired the lock) will change the centralised configuration registry and once done will delete the file (release the lock).

So let’s start with the lock object.

package com.gkatzioura.gcs.lock;

import java.util.Optional;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;

public class GCSLock {

	public static final String LOCK_STRING = "_lock";
	private final Storage storage;
	private final String bucket;
	private final String keyName;

	private Optional<Blob> acquired = Optional.empty();

	GCSLock(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public boolean acquire() {
		try {
			var blobInfo = BlobInfo.newBuilder(bucket, keyName).build();
			var blob = storage.create(blobInfo, LOCK_STRING.getBytes(), Storage.BlobTargetOption.doesNotExist());
			acquired = Optional.of(blob);
			return true;
		} catch (StorageException storageException) {
			return false;
		}
	}

	public void release() {
		if(!acquired.isPresent()) {
			throw new IllegalStateException("Lock was never acquired");
		}
		storage.delete(acquired.get().getBlobId());
	}

}

As you can see the write specifies to write an object only if it does not exist. This operation behind the scenes is using the x-goog-if-generation-match header which is used for concurrency.
Thus one node will be able to acquire the lock and change the configuration files.
Afterwards it can delete the lock. If an exception is raised probably the operation fails and the lock is already acquired.

To make the example more complete let’s make the configuration file. The configuration file would be a simple json file for key map actions.

package com.gkatzioura.gcs.lock;

import java.util.HashMap;
import java.util.Map;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.json.JSONObject;

public class GCSConfiguration {

	private final Storage storage;
	private final String bucket;
	private final String keyName;

	GCSConfiguration(Storage storage, String bucket, String keyName) {
		this.storage = storage;
		this.bucket = bucket;
		this.keyName = keyName;
	}

	public void addProperty(String key, String value) {
		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		final JSONObject configJson;

		if(blob==null) {
			configJson = new JSONObject();
		} else {
			configJson = new JSONObject(new String(blob.getContent()));
		}

		configJson.put(key, value);

		var blobInfo = BlobInfo.newBuilder(blobId).build();
		storage.create(blobInfo, configJson.toString().getBytes());
	}

	public Map<String,String> properties() {

		var blobId = BlobId.of(bucket, keyName);
		var blob = storage.get(blobId);

		var map = new HashMap<String,String>();

		if(blob!=null) {
			var jsonObject = new JSONObject(new String(blob.getContent()));
			for(var key: jsonObject.keySet()) {
				map.put(key, jsonObject.getString(key));
			}
		}

		return map;
	}

}

It is a simple config util backed by GCS. Eventually it can be changed and put the lock operating inside the addProperty operation, it’s up to the user and the code. For the purpose of this blog we shall just acquire the lock change the configuration and release the lock.
Our main class will look like this.

package com.gkatzioura.gcs.lock;

import com.google.cloud.storage.StorageOptions;

public class Application {

	public static void main(String[] args) {
		var storage = StorageOptions.getDefaultInstance().getService();

		final String bucketName = "bucketName";
		final String lockFileName = "lockFileName";
		final String configFileName = "configFileName";

		var lock = new GCSLock(storage, bucketName, lockFileName);
		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

		var lockAcquired = lock.acquire();
		if(lockAcquired) {
			gcsConfig.addProperty("testProperty", "testValue");
			lock.release();
		}

		var config = gcsConfig.properties();

		for(var key: config.keySet()) {
			System.out.println("Key "+key+" value "+config.get(key));
		}

	}

}

Now let’s go for some multithreading. Ten threads will try to put values, it is expected that they have some failure.

package com.gkatzioura.gcs.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class ApplicationConcurrent {

	private static final String bucketName = "bucketName";
	private static final String lockFileName = "lockFileName";
	private static final String configFileName = "configFileName";

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		var storage = StorageOptions.getDefaultInstance().getService();

		final int threads = 10;
		var service = Executors.newFixedThreadPool(threads);
		var futures = new ArrayList<Future>(threads);

		for (var i = 0; i < threads; i++) {
			futures.add(service.submit(update(storage, "property-"+i, "value-"+i)));
		}

		for (var f : futures) {
			f.get();
		}

		service.shutdown();

		var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);
		var properties = gcsConfig.properties();

		for(var i=0; i < threads; i++) { System.out.println(properties.get("property-"+i)); } } private static Runnable update(final Storage storage, String property, String value) { return () -> {
			var lock = new GCSLock(storage, bucketName, lockFileName);
			var gcsConfig = new GCSConfiguration(storage, bucketName, configFileName);

			boolean lockAcquired = false;

			while (!lockAcquired) {
				lockAcquired = lock.acquire();
				System.out.println("Could not acquire lock");
			}

			gcsConfig.addProperty(property, value);
			lock.release();
		};
	}
}

Obviously 10 threads are ok to display the capabilities. During the thread initialization and execution some threads will try to acquire the lock simultaneously and one will fails, while other threads will be late and will fail and wait until the lock is available.

In the end what is expected is all of them to have their values added to the configuration.
That’s it. If your problems have a simple nature this approach might do the trick. Obviously you can use the http api instead of the sdk. You can find the code on github.

Host your maven artifacts on the cloud using CloudStorageMaven

One of the major issues when dealing with large codebases in our teams has to do with artifact sharing and artifact storage.

There are various options out there that provide many features such as jfrog, nexus, archiva etc.

I have been into using them, setting them up and configuring and they certainly provide you with many features. Also having you own repository installation gives you a lot of flexibility. Furthermore docker has made things a lot easier and thus setting them up takes almost no time.

Now if you use a cloud provider like amazon, azure etc there is a more lightweight option and pretty easy to setup. By using a cloud provider such as amazon, azure or google you have cheap and easy access to storage. The storage options that they provide can also be used in order to host your private artifacts or even your public ones.

To do so you need to use a maven wagon which is capable to communicate with the storage options that your cloud provider has and this is exactly what the CloudStorageMaven project deals with.

The CloudStorageMaven project provides you with wagons interacting with Amazon S3, Azure Blob Storage and Google Cloud Storage.

If you already use one of these cloud services hosting your artificats on them seems like a no brainer and theese wagons make it a lot easier to do so.

I have compiled some tutorials on how to get started with each one of them

Happy coding!

Host your maven artifacts using Google Cloud Storage

If you use Google Cloud and you use Java for your projects then Google Cloud Storage is a great place to host your teams artifacts.

It is easy to setup and pretty cheap. Also it is much simpler than setting one of the existing repository options (jfrog, nexus, archiva etc) if you are not particularly interested in their features.

To get started you need to specify a maven wagon which supports google cloud storage.
We will use the Google storage wagon.

Let’s get started by creating a maven project

mvn archetype:generate -DgroupId=com.test.apps -DartifactId=GoogleWagonTest -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

We are going to add a simple service.

package com.test.apps;

public class HelloService {

    public String sayHello() {

        return "Hello";
    }
}

Then we are going to add the maven wagon which will upload and fetch our binaries to the google cloud storage.

    <build>
        <extensions>
            <extension>
                <groupId>com.gkatzioura.maven.cloud</groupId>
                <artifactId>google-storage-wagon</artifactId>
                <version>1.0</version>
            </extension>
        </extensions>
    </build>

Then we shall create the google cloud storage bucket which will host our artifacts.

Our bucket shall be called mavenrepository

Now that we have set up our bucket in google we shall set the distribution management on our maven project.

    <distributionManagement>
        <snapshotRepository>
            <id>my-repo-bucket-snapshot</id>
            <url>gs://mavenrepository/snapshot</url>
        </snapshotRepository>
        <repository>
            <id>my-repo-bucket-release</id>
            <url>gs://mavenrepository/release</url>
        </repository>
    </distributionManagement>

From the maven documentation

Where as the repositories element specifies in the POM the location and manner in which Maven may download remote artifacts for use by the current project, distributionManagement specifies where (and how) this project will get to a remote repository when it is deployed. The repository elements will be used for snapshot distribution if the snapshotRepository is not defined.

The next step is the most crucial and this has to to do with authenticating to google cloud.

You need to have the gcloud command line setup in your system and you must issue a login
‘gcloud auth login –brief’ with an account that has access to the bucket we created previously.
The other way is to use the GOOGLE_APPLICATION_CREDENTIALS environment variable. You can use this GOOGLE_APPLICATION_CREDENTIALS in order to set the path to your google application credentials file.
The credentials file should also be able to access the bucket we created previously.

And now the easiest part which is deploying.

mvn deploy

Now since your artifact has been deployed you can use it in another repo by specifying your repository and your wagon.

    <repositories>
        <repository>
            <id>my-repo-bucket-snapshot</id>
            <url>gs://mavenrepository/snapshot</url>
        </repository>
        <repository>
            <id>my-repo-bucket-release</id>
            <url>gs://mavenrepository/release</url>
        </repository>
    </repositories>

    <build>
        <extensions>
            <extension>
                <groupId>com.gkatzioura.maven.cloud</groupId>
                <artifactId>google-storage-wagon</artifactId>
                <version>1.0</version>
            </extension>
        </extensions>
    </build>

That’s it! Next thing you know, your artifact will be downloaded by maven through google cloud storage and used as a dependency in your new project.

Host your maven artifacts using Azure Blob Storage

If you use Microsoft Azure and you use Java for your projects then Azure Blob Storage is a great place to host your teams artifcats.

It is easy to setup and pretty cheap. Also it is much simpler than setting one of the existing repository options (jfrog, nexus, archiva etc) if you are not particularly interested in their features.

To get started you need to specify a maven wagon which supports azure blob storage.
We will use the Azure storage wagon.

Let’s get started by creating a maven project

mvn archetype:generate -DgroupId=com.test.apps -DartifactId=AzureWagonTest -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

We are going to add a simple service.

package com.test.apps;

public class HelloService {

    public String sayHello() {

        return "Hello";
    }
}

Then we are going to add the maven wagon which will upload and fetch our binaries to azure blob storage.

    <build>
        <extensions>
            <extension>
                <groupId>com.gkatzioura.maven.cloud</groupId>
                <artifactId>azure-storage-wagon</artifactId>
                <version>1.0</version>
            </extension>
        </extensions>
    </build>

Then we shall create the azure storage account that will host our artifacts.

Then we shall create a new container called snapshot. This container will contain our snapshot repositories.

We can go through the same process in order to create a release repository.
Be aware that there is no need to to create different containers for each repository. You can have repositories under the same container.

Now that we have set up our storage account in azure we shall set the distribution management on our maven project.

    <distributionManagement>
        <snapshotRepository>
            <id>my-repo-bucket-snapshot</id>
            <url>bs://mavenrepository/snapshot</url>
        </snapshotRepository>
        <repository>
            <id>my-repo-bucket-release</id>
            <url>bs://mavenrepository/release</url>
        </repository>
    </distributionManagement>

From the maven documentation

Where as the repositories element specifies in the POM the location and manner in which Maven may download remote artifacts for use by the current project, distributionManagement specifies where (and how) this project will get to a remote repository when it is deployed. The repository elements will be used for snapshot distribution if the snapshotRepository is not defined.

The next step is the most crucial and this has to to do with authenticating to azure.

What you need is your storage account name and the key of the storage account.
In order to retrieve both you have to navigate to the Access keys of your Storage Account at the Settings section.

Then we shall specify our storage account credentials on the ~/.m2/settings.xml

  <servers>
    <server>
      <id>my-repo-bucket-snapshot</id>
      <username>mavenrepository</username>
      <password>eXampLEkeyEMI/K7EXAMP/bPxRfiCYEXAMPLEKEY</password>
    </server>
    <server>
      <id>my-repo-bucket-release</id>
      <username>mavenrepository</username>
      <password>eXampLEkeyEMI/K7EXAMP/bPxRfiCYEXAMPLEKEY</password>
    </server>
  </servers>

Be aware that you have to specify credentials for each repository specified.

And now the easiest part which is deploying.

mvn deploy

Now since your artifact has been deployed you can use it in another repo by specifying your repository and your wagon.

    <repositories>
        <repository>
            <id>my-repo-bucket-snapshot</id>
            <url>bs://mavenrepository/snapshot</url>
        </repository>
        <repository>
            <id>my-repo-bucket-release</id>
            <url>bs://mavenrepository/release</url>
        </repository>
    </repositories>

    <build>
        <extensions>
            <extension>
                <groupId>com.gkatzioura.maven.cloud</groupId>
                <artifactId>azure-storage-wagon</artifactId>
                <version>1.0</version>
            </extension>
        </extensions>
    </build>

That’s it! Next thing you know your artifact will be downloaded by maven through azure blob storage and used as a dependency in your new project.