Java Concurrency: The AbstractQueuedSynchronizer

Previously we worked with one of the building blocks of Java concurrency the LockSupport class. On this blog we will have a look at the AbstractQueuedSynchronizer.
AbstractQueuedSynchronizer is one of the building blocks of Java synchronization. Lock implementations such as ReentrantLock and Semaphore are based on the AbstractQueuedSynchronizer.
An integer represents the state. Based on the use case what a state represent can vary for example it can represent how many permits are available. Operations that change the state are guarded by a FIFO queue and tries to do so in a fair manner, and provided access to the lock in the order which the thread arrived.

 

The FIFO queue is implemented using a nested Node class is. The Node represents the Thread waiting in the queue. The operations are atomic and the variables are volatile. It contains references to the previous and the next node.

    abstract static class Node {
        volatile Node prev;       // initially attached via casTail
        volatile Node next;       // visibly nonnull when signallable
        Thread waiter;            // visibly nonnull when enqueued
        volatile int status;      // written by owner, atomic bit ops by others

...
    }

An instance of the AbstractQueuedSynchronizer, since it provides a FIFO, will have a reference of the head and tail in the class:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...
    private transient volatile Node head;
    private transient volatile Node tail;
...
}

As mentioned we have the state variable¨

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...
    private volatile int state;
...
}

Τhe state variable can be used to represent the resources available or allocated.

When it comes to ReentrantLock it represent the number of holds on the lock:

public class ReentrantLock implements Lock, java.io.Serializable {
...
    static final class FairSync extends Sync {
...
        final boolean initialTryLock() {
...
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
...
    }
...
}

When it comes to the Semaphore it represents the permits:

public class Semaphore implements java.io.Serializable {
...
    static final class FairSync extends Sync {
...
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
...
   }
...
}

Since the AbstractQueuedSynchronizer supports a shared mode as well as an exclusive mode, there are two Node implementations to represent it.

    static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

For example ReentrantLock is on an Exclusive mode while Semaphore is on a shared mode.

The acquire method is where the logic lies.

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

The implementation of acquiring the lock can vary from jdk implementations but the objectives should be the same.
The first step is the thread to try and acquire the lock if it is available without blocking.
This can vary on the Shared on Exclusive mode.
If the lock is held the Thread is blocked and aded to the waiting queue.

    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
...
        }
    }

As the lock gets released by the thread holding it the threads in the queue are signaled to proceed and acquire the lock.
The mechanism varies whether shared or exclusive.

Now that we know more on the AbstractQueuedSynchronizer we can check its implementations and find more about its usage.

Java Concurrency: LockSupport

Previously we had a look on the lock class and how it works behind the scenes.
On this post we shall look at LockSupport and its native methods which are one of the building block for locks and synchronization classes.

As we can see the methods on lock support are static and behind the scenes they are based on on the Unsafe class.
The Unsafe class is not intended to be used by your code directly  thus only trusted code can obtain instances of it.

If we see the methods available we can identify the park method, the unpark methods and the get Blocker method.

public class LockSupport {
...

    public static void park() 
    public static void park(Object blocker) 
    public static void parkNanos(long nanos) 
    public static void parkNanos(Object blocker, long nanos) 
    public static void parkUntil(long deadline) 
    public static void parkUntil(Object blocker, long deadline) 
    public static Object getBlocker(Thread t) 
    public static void unpark(Thread thread) 
...
}

 

Each thread that uses the class LockSupport is associated with a permit.
When we call park the thread is disabled for thread scheduling purposes. Provided the permit is available the permit is consumed and the call returns immediately.

public final class Unsafe {
...
    public native void unpark(Object thread);
    public native void park(boolean isAbsolute, long time);
...
}

We can trace down the permit in the source code implementation and eventually end up on the posix implementation:

void Parker::park(bool isAbsolute, jlong time) {


  if (Atomic::xchg(&_counter, 0) > 0) return;
  JavaThread *jt = JavaThread::current();

  if (jt->is_interrupted(false)) {
    return;
  }

  struct timespec absTime;
  if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
    return;
  }
  if (time > 0) {
    to_abstime(&absTime, time, isAbsolute, false);
  }


  ThreadBlockInVM tbivm(jt);

  if (pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert_status(status == 0, status, "invariant");
    OrderAccess::fence();
    return;
  }

  OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait(&_cond[_cur_index], _mutex);
    assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT),
                  status, "cond_wait");
  }
  else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
    assert_status(status == 0 || status == ETIMEDOUT,
                  status, "cond_timedwait");
  }
  _cur_index = -1;

  _counter = 0;
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "invariant");
  OrderAccess::fence();
}

In a nutsel _counter is a permit and if the permit is bigger than 0 it will consumed and the method will return immediately. If an interrupt is pending the method should return immediately. If nanos to wait have been supplied then the time to wait on the condition is calculated. By using the mutex provided either the thread will wait forever until is unparked or interuped or the thread will wait for the nanos provided.

Let’s see an example of parking a thread:

    @Test
    void park() throws InterruptedException {
        Thread secondaryThread = new Thread(LockSupport::park);

        secondaryThread.start();

        secondaryThread.join(2000);
        log.info("Could not join thread is parked");

        assertTrue(secondaryThread.isAlive());
        LockSupport.unpark(secondaryThread);
        secondaryThread.join();

        assertFalse(secondaryThread.isAlive());
        log.info("Thread was unparked");
    }

We start the thread and park it. When we try to wait for the main thread we use a time limit, if time limit was not set we would wait forever. Eventually the time passes and before retrying to join the thread we unpark. As expected the thread is unparked and eventually finishes.

If the permit is not available the thread lies dormant disabled for thread scheduling. Initially the permit is zero. We can try with one thread.

    @Test
    void unParkAndPark() {
        final Thread mainThread = Thread.currentThread();
        LockSupport.unpark(mainThread);
        LockSupport.park();
    }

By using unpark initially we made a permit available, thus on park the permit was consumed and we returned immediately.

The other element that we see in the LockSupport class is the blocker.

The Blocker maps to the parkBlocker object in the Thread implementation of java.

public class Thread implements Runnable {
...
    volatile Object parkBlocker;
...
}

The blocker represents the synchronization object responsible for this thread parking.
Thus we can note which object is responsible for the thread being parked, which can help for debugging and monitoring purposes.
The unsafe methods are called to set the parkBlocker value to the Thread instance.

Now that we saw how LockSupport works behinds the scenes we can see the example provided by the Javadoc the FIFOMutex.

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

public class FIFOMutex {

    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters
            = new ConcurrentLinkedQueue<Thread>();

    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);

        // Block while not first in queue or cannot acquire lock
        while (waiters.peek() != current ||
               !locked.compareAndSet(false, true)) {
            LockSupport.park(this);
            if (Thread.interrupted()) // ignore interrupts while waiting
                wasInterrupted = true;
        }

        waiters.remove();
        if (wasInterrupted)          // reassert interrupt status on exit
            current.interrupt();
    }

    public void unlock() {
        locked.set(false);
        LockSupport.unpark(waiters.peek());
    }

}

All threads will be added to the queue. Since this is a FIFO queue, the first thread should mark the locked variable as true and should be able to proceed and remove itself from the queue. The rest of the threads should be parked. In case of an interrupt LockSupport will exit thus we should reassert the interrupt on the thread on exit.
Also pay attention that the blocker is set as the instance of the FIFOMutex class.

By using unlock the threads that have been in a parked state will resume.

We can also check another usage of lock support. Rate limiters can benefit from LockSupport.
We can check the source code of the Refill Rate Limiter:

private boolean waitForPermission(final long nanosToWait) {
        waitingThreads.incrementAndGet();
        long deadline = currentNanoTime() + nanosToWait;
        boolean wasInterrupted = false;
        while (currentNanoTime() < deadline && !wasInterrupted) {
            long sleepBlockDuration = deadline - currentNanoTime();
            parkNanos(sleepBlockDuration);
            wasInterrupted = Thread.interrupted();
        }
        waitingThreads.decrementAndGet();
        if (wasInterrupted) {
            currentThread().interrupt();
        }
        return !wasInterrupted;
    }

In a Rate Limiter the permits to acquire are limited over time. In case of a thread trying to acquire a permit there is an option to wait until a permit is available. In this case we can park the thread. This way our rate limiter will prevent the time busy on spinning.

That’s it. Now that we know about lock support we can proceed on more interesting concurrency concepts.

Java Concurrency: Object monitor methods

Previously we used monitor locks to solve a problem in a thread safe way. On Java each object has a lock associated with it. Among with this monitor lock the object has some methods that depend on it: wait, notify, notifyAll. Those methods can help on coordinating threads. For example a thread writing to a variable and notifying another thread on that variable change.

By using the wait method the thread will wait until notified or interrupted. In order to call this method from an object the thread should have acquired the lock associated with the object. Behind the scenes the thread will get into a wait queue among other threads that wait. That queue is associated with the object that we use its monitor lock.

We can see that on the objectMonitor sourcecode.

  // create a node to be put into the queue
  // Critically, after we reset() the event but prior to park(), we must check
  // for a pending interrupt.
  ObjectWaiter node(current);
  node.TState = ObjectWaiter::TS_WAIT;
  current->_ParkEvent->reset();
  OrderAccess::fence();          

Once the thread gets into a wait state, it will relinquish the lock it acquired from the object. The thread will be dormant until it is interrupted or it gets notified to wake up.

We shall use an object as a lock and provide a runnable implementation:

    private Object object = new Object();

    public class WaitingRunnable implements Runnable {

        @Override
        public void run() {
            try {
                log.info("Waiting to be notified");
                synchronized (object) {
                    object.wait();
                    log.info("Notified!");
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

If we try to call wait on the Object without acquiring the monitor lock a failure will occur.

    @Test
    void illegalMonitorState() {
        assertThrows(IllegalMonitorStateException.class, () -> object.wait());
    }

Instead we will be successful if we acquire the lock.

    @Test
    void testNotifyWait() throws InterruptedException {
        Thread waitingThread = new Thread(new WaitingRunnable());
        waitingThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
            Thread.sleep(1000);
            log.info("Not released yet");
        }

        waitingThread.join();
    }

We can see the following output

21:40:23.606 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
21:40:24.111 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
21:40:25.117 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Not released yet
21:40:25.121 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

Let’s take this into one step further. Once the thread is awaken it will try to acquire the lock again. The lock will not be released until the thread that called notify will exit.
We can see that in the following example:

    @Test
    void testNotifyWait() throws InterruptedException {
        Thread waitingThread = new Thread(new WaitingRunnable());
        waitingThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
            log.info("Should wait");
            Thread.sleep(1000);
        }

        waitingThread.join();
    }

The output would be the following:

21:38:38.431 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
21:38:38.936 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
21:38:38.936 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Should wait
21:38:39.942 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

We have the notify method and the notifyAll method. In case of notify will pick one thread from the waiting queue and notify it. In case of notifyAll all threads from the queue will be notified.

For example:

    @Test
    void testNotifyOnce() throws InterruptedException {
        Thread firstThread = new Thread(new WaitingRunnable());
        Thread secondThread = new Thread(new WaitingRunnable());

        firstThread.start();
        secondThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
        }

        firstThread.join();
        secondThread.join(2000);
    }

Only one thread will be notified which we can see from the output

07:52:28.095 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:52:28.095 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:52:28.600 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
07:52:28.601 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

Now if we use notify all, all waiting threads will be notified :

    @Test
    void testNotifyAll() throws InterruptedException {
        Thread firstThread = new Thread(new WaitingRunnable());
        Thread secondThread = new Thread(new WaitingRunnable());

        firstThread.start();
        secondThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notifyAll();
        }

        firstThread.join();
        secondThread.join();
    }

07:54:28.712 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:54:28.712 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:54:29.211 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
07:54:29.211 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!
07:54:29.212 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

That’s it! We went one step further into coordinating threads by using the monitor method provided by the Object class.

Java Concurrency: The Lock interface

Previously we implemented a thread safe counter using synchronized. We would like to swift from synchronized blocks to something more flexible with more features, this is were locks are of use. On this blog we will focus on Java’s Lock interface.

 

 

The Lock interface supports three forms of lock acquisition interruptible, non-interruptible, and timed.

We can acquire locks between threads. A thread will wait until the lock is released from the thread holding the lock:

    @Test
    void lock() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            log.info("Acquired delayed");
            lock.unlock();
        });

        lock.lock();
        withDelayedLock.start();

        Thread.sleep(500);
        log.info("Will release");
        lock.unlock();
        withDelayedLock.join();
    }

Since the threads blocks on the lock we might as well interrupt the thread. In this case we can lock lockInterruptibly, and apply any logic in case of an Interrupt:

    @Test
    void lockInterruptibly() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                log.error("interrupted while waiting",e);
            }
        });

        lock.lock();
        withDelayedLock.start();
        withDelayedLock.interrupt();
        lock.unlock();
        withDelayedLock.join();
    }

A thread can also try to acquire a lock which is already acquired, and exit immediately instead of blocking.

    @Test
    void tryLock() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            boolean locked = lock.tryLock();
            assertFalse(locked);
        });

        lock.lock();
        withDelayedLock.start();
        Thread.sleep(500);
        lock.unlock();
        withDelayedLock.join();
    }

Also a time period can be specified until the lock is acquired.

    @Test
    void tryLockTime() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            try {
                boolean locked = lock.tryLock(100, TimeUnit.MILLISECONDS);
                assertFalse(locked);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        lock.lock();
        withDelayedLock.start();
        Thread.sleep(500);
        lock.unlock();
        withDelayedLock.join();
    }

Another thing of importance with locks is memory.

From the documentation

All {@code Lock} implementations <em>must</em> enforce the same
* memory synchronization semantics as provided by the built-in monitor
* lock, as described in
* Chapter 17 of
* <cite>The Java Language Specification</cite>:
*
<ul>*
 	<li>A successful {@code lock} operation has the same memory
* synchronization effects as a successful <em>Lock</em> action.
*</li>
 	<li>A successful {@code unlock} operation has the same
* memory synchronization effects as a successful <em>Unlock</em> action.
*</li>
</ul>

In order for the results to be flushed on the main memory we need to use lock and unlock.
Take the following example

    private Lock lock = new ReentrantLock();
    private String threadAcquired = "main";

    @Test
    void wrongMemoryVisibility() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            try {
                threadAcquired = "delayed";
                System.out.println("Acquired on delayed");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }

        });

        lock.lock();
        try {
            withDelayedLock.start();
            Thread.sleep(500);
        } finally {
            lock.unlock();
        }

        while (true) {
            System.out.println("Currently acquired " + threadAcquired);
            if (threadAcquired.equals("delayed")) {
                break;
            }
        }

        withDelayedLock.join();
        threadAcquired = "main";
    }

We print the variable threadAcquired by the main thread while it is changed by the thread with a Delayed lock.
After a few runs a situation where the threadAcquired variable has a stale value on the main thread will appear.

Acquired on delayed
Currently acquired main

We did this on purpose, on a real world problem we should not access variables like threadAcquired without proper synchronisation, for example we should have acquired the lock first, this way the memory would be synchronised and we would have an up to date value.

    @Test
    void correctMemoryVisibility() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            try {
                threadAcquired = "delayed";
                System.out.println("Acquired on delayed");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }

        });

        lock.lock();
        try {
            withDelayedLock.start();
            Thread.sleep(500);
        } finally {
            lock.unlock();
        }

        while (true) {
            lock.lock();
            try {
                System.out.println("Currently acquired " + threadAcquired);
                if (threadAcquired.equals("delayed")) {
                    break;
                }
            } finally {
                lock.unlock();
            }
        }

        withDelayedLock.join();
    }

That’s all for now, we will later proceed on different type of locks and the Condition interface.

Java Concurrency: Synchronized

Previously we had an introduction to threads in Java and some of the internals. On this blog we will proceed on developing thread safe code using synchronized.

 

 

Intrinsic lock or monitor lock is achieved using the keyword synchronized

For synchronized we need an object to use for locking. Every object has an intrinsic lock associated with it.

When a thread enters a synchronized block the lock is acquired and once the operation is done the lock will be released.

Let’s start with our application which will be single threaded for start.
It will be a counter application.

We will define an interface

package com.gkatzioura.concurrency.intrinsiclock;

public interface Counter {
    void increment();

    Integer get();

}

and a non thread safe implementation

package com.gkatzioura.concurrency.intrinsiclock;

public class SimpleCounter implements Counter {

    private Integer counter = 0;

    @Override
    public void increment() {
        counter++;
    }

    @Override
    public Integer get() {
        return counter;
    }

}

To achieve thread safety we shall use synchronized functions. On synchronized methods the intrinsic lock acquired is the lock of the method’s object. Essentially the lock will be that class instance.
Our class will transform to this.

package com.gkatzioura.concurrency.intrinsiclock;

public class CounterSynchronizedFunctions implements Counter {

    private Integer counter = 0;

    public synchronized void increment() {
        counter++;
    }

    public synchronized Integer get() {
        return counter;
    }

}

Multiple threads will increase the value. If more than two threads increment the same value at the same time we lose increment operations.
Thus we want all threads to increment the counter and not to overwrite each other’s results.
Another thing to observe is that we have synchronized on the get method. This is because we want to get a consistent value. We would like to get a value which has been changed by another thread, this would not be possible if the counter value fetched, would be from the CPU cache and has not been updated with latest changes. By using synchronized in this case we take advantage of the synchronized guarantees. When entering the synchronized block all the variables visible by the thread are updated from the main memory. Also when we leave the synchronized block the variables changes are written back to memory.

Since we put synchronized in the class methods we use the lock of that class instance.
This approach is handy however it has the pitfall that if the same object is used for other locking usages the counter will not operate properly.

Let’s simulate this one.

package com.gkatzioura.concurrency.intrinsiclock;

import java.util.Date;

import org.junit.jupiter.api.Test;

import lombok.extern.slf4j.Slf4j;


@Slf4j
class CounterSynchronizedFunctionsTest {

    @Test
    void name() throws InterruptedException {
        CounterSynchronizedFunctions counter = new CounterSynchronizedFunctions();

        Thread old = new Thread(() -&amp;gt; {
            synchronized (counter) {
                try {
                    log.info("Acquiring lock first [{}]", new Date());
                    Thread.sleep(10000);
                    log.info("Releasing lock [{}]", new Date());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread newOne = new Thread(() -&amp;gt; {
            log.info("I should be blocked [{}]", new Date());
            counter.increment();
            log.info("I am unblocked [{}]", new Date());
        });

        old.start();
        Thread.sleep(1000);
        newOne.start();

        old.join();
        newOne.join();
    }
}

The first thread uses the class instance of Counter as a monitoring lock. The second thread will try to use the increment function and therefore will block, until the lock is released from the first thread.

We can validate by checking the output

12:50:54.631 [Thread-0] INFO com.gkatzioura.concurrency.intrinsiclock.CounterSynchronizedFunctionsTest - Acquiring lock first [Sun Mar 05 12:50:54 GMT 2023]
12:50:55.639 [Thread-1] INFO com.gkatzioura.concurrency.intrinsiclock.CounterSynchronizedFunctionsTest - I should be blocked [Sun Mar 05 12:50:55 GMT 2023]
12:51:04.656 [Thread-0] INFO com.gkatzioura.concurrency.intrinsiclock.CounterSynchronizedFunctionsTest - Releasing lock [Sun Mar 05 12:51:04 GMT 2023]
12:51:04.658 [Thread-1] INFO com.gkatzioura.concurrency.intrinsiclock.CounterSynchronizedFunctionsTest - I am unblocked [Sun Mar 05 12:51:04 GMT 2023]

This is something that can happen since the object can be passed around through the codebase.
To avoid issues like this we will use a lock visible only inside the class.

package com.gkatzioura.concurrency.intrinsiclock;

public class CounterInternallySynchronized implements Counter {

    private static final Object lock = new Object();

    private Integer counter = 0;

    public void increment() {
        synchronized (lock) {
            counter++;
        }
    }

    public Integer get() {
        synchronized (lock) {
            return counter;
        }
    }

}

We can try now the same test but with a different implementation.

package com.gkatzioura.concurrency.intrinsiclock;

import java.util.Date;

import org.junit.jupiter.api.Test;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CounterInternallySynchronizedTest {

    @Test
    void name() throws InterruptedException {
        Counter counter = new CounterInternallySynchronized();

        Thread old = new Thread(() -&amp;gt; {
            synchronized (counter) {
                try {
                    log.info("Acquiring lock using counter instance [{}]", new Date());
                    Thread.sleep(10000);
                    log.info("Releasing lock using counter instance [{}]", new Date());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread newOne = new Thread(() -&amp;gt; {
            log.info("I should not be blocked [{}]", new Date());
            counter.increment();
            log.info("I finished operation unblocked [{}]", new Date());
        });

        old.start();
        Thread.sleep(1000);
        newOne.start();

        old.join();
        newOne.join();
    }

}
12:52:58.314 [Thread-0] INFO com.gkatzioura.concurrency.intrinsiclock.CounterInternallySynchronizedTest - Acquiring lock using counter instance [Sun Mar 05 12:52:58 GMT 2023]
12:52:59.334 [Thread-1] INFO com.gkatzioura.concurrency.intrinsiclock.CounterInternallySynchronizedTest - I should not be blocked [Sun Mar 05 12:52:59 GMT 2023]
12:52:59.335 [Thread-1] INFO com.gkatzioura.concurrency.intrinsiclock.CounterInternallySynchronizedTest - I finished operation unblocked [Sun Mar 05 12:52:59 GMT 2023]
12:53:08.344 [Thread-0] INFO com.gkatzioura.concurrency.intrinsiclock.CounterInternallySynchronizedTest - Releasing lock using counter instance [Sun Mar 05 12:53:08 GMT 2023]

As expected we keep our code thread safe while we avoid any accidental lock acquisition.

Java Concurrency: Threads and Runnables

Threads are everywhere, they are the basic building block of every server application out there.

Usually in Java using threads is just a combination of Executors and Runnables however let’s have a closer look on a thread and how it works.

 

Supposing I want to start a thread, it can be as simple as this.

        Thread thread = new Thread(() {
            System.out.println("Running somewhere else");
        });
        thread.start();
        thread.join();

What will happen in this case is that we create a thread and we join the thread until the operation finishes.

If we check the internals of the thread class we can see that in our initialization we passed a Runnable.

So the Runnable essentially instructs the thread what to do. It encapsulates the logic that will be executed once we start a thread.
If we check the source code the Runnable we can see that it is an interface with only one function.

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Essentially Runnable is a functional interface.

From the documentation.

This is a functional interface and can therefore be used as the assignment target for a lambda expression or method reference.

The runnable can be passed to lambdas, Executors-Thread pools and use it to create standalone threads.

So let’s have a closer look on what a thread does.

If we check the implementation we can see that a Thread implements the Runnable interface.

public class Thread implements Runnable {
...
    public Thread(Runnable target) {
        this(null, target, "Thread-" + nextThreadNum(), 0);
    }
...
}

We also displayed a specific thread constructor on purpose, the constructor passes the runnable as a target variable.

If we check the run method of the thread it will execute the Runnable we passed previously.


...
    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }

Supposing no runnable has been passed in one of the constructors the thread will not execute anything.
So let’s see another approach where we shall run a thread by extending it.

public class CustomThread extends Thread {

    @Override
    public void run() {
        System.out.println("No runnable passed");
    }

}

Which we can run like this

        Thread thread = new CustomThread();
        thread.start();
        thread.join();

So start will essentially execute the run the method that we implemented or the original run method using the target Runnable.
If we check start we shall stumble upon the following code block

...
        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
...

As we see there a method called start0() is invoked.
This method is essentially

    private native void start0();

We can see the native methods on the  Thread.c class.

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    ...
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

This brings us to the actual native code starting the thread which will end up on the actual implementation based on the os used.
For example on linux it would be pthreads.

Java and dynamic Proxies

Dynamic proxies in Java is a simple and very useful feature.

Usually we create an interface implementation and then compilation is involved. With dynamic proxies we can implement a list of interfaces at runtime. A proxy object will be created, when a method is invoked on that proxy instance, the methods invoked will be forwarded to an invocation handler specified.

This can have various usages. A common use case would be for a java interface which we can use a proxy and intercept the calls to the methods invoked.

Supposing we have a JDBC connection pool and we want to have something like a micrometer counter. On getting a connection the counter will increase, thus we can identify the rate of acquiring connections in our application.

We shall use a proxy for that.

Let’s first add a docker container for Postgresql using Docker Compose.

version: '3.1'

services:
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    ports:
      - 5432:5432

We can run with the following command

docker compose up

You can find more on Compose on the Developers Essential Guide to Docker Compose.

Let’s add our dependencies to our Java project

    <dependencies>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.5.1</version>
        </dependency>
        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>4.0.3</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

Now will also add a small snippet.

package com.example.gkatzioura.proxy;

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

public class Application {

    public static void main(String[] args) throws SQLException {
        Properties props = new Properties();

        props.setProperty("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
        props.setProperty("dataSource.user", "postgres");
        props.setProperty("dataSource.password", "postgres");
        props.setProperty("dataSource.databaseName", "postgres");
        props.put("dataSource.logWriter", new PrintWriter(System.out));

        HikariConfig config = new HikariConfig(props);
        
        try(HikariDataSource ds = new HikariDataSource(config)) {
            try(Connection connection = ds.getConnection()) {
                System.out.println("Should be printed after the proxy");       
            }
        }
    }

}

We can examine the DataSource interface of Java on the source code. We can see the method of interest, getConnection, present.

public interface DataSource  extends CommonDataSource, Wrapper {

    Connection getConnection() throws SQLException;
    ...
}

Instead of creating an interface implementation and having to implement all those methods and then delegate them to the actual DataSource instance, we shall instead use a proxy and add actions only to the method of interest, in our case getConnection.

We shall implement an InvocationHandler.

package com.example.gkatzioura.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import javax.sql.DataSource;

public class GetConnectionHandler implements InvocationHandler {

    private final DataSource dataSource;

    public GetConnectionHandler(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getName().equals("getConnection")) {
            System.out.println("Called before actual method");

        }

        return method.invoke(dataSource, args);
    }

    public static DataSource proxy(DataSource dataSource) {
        return (DataSource) Proxy.newProxyInstance(DataSource.class.getClassLoader(), new Class[]{DataSource.class}, new GetConnectionHandler(dataSource));
    }

}

Let’s break it down.

The invocation handler will be called for the method of an interface specified. When an interface method is invoked it is our choice how we shall handle it. In our case we shall print a simple message and then we shall execute the corresponding method to our target instance.

Also we have a static factory specified which shall proxy the object implementing the interface of interest. A new proxy instance will be created, it will implement the interfaces provided and the calls towards the proxy instance will be passed to the handler we provided.

Let’s revisit our main method.

package com.example.gkatzioura.proxy;

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

import javax.sql.DataSource;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

import static com.example.gkatzioura.proxy.GetConnectionHandler.proxy;

public class Application {

    public static void main(String[] args) throws SQLException {
        Properties props = new Properties();

        props.setProperty("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
        props.setProperty("dataSource.user", "postgres");
        props.setProperty("dataSource.password", "postgres");
        props.setProperty("dataSource.databaseName", "postgres");
        props.put("dataSource.logWriter", new PrintWriter(System.out));

        HikariConfig config = new HikariConfig(props);

        try(HikariDataSource ds = new HikariDataSource(config)) {
            DataSource dataSource = proxy(ds);
            try(Connection connection = dataSource.getConnection()) {
                System.out.println("Should be printed after the proxy");
            }
        }
    }

}

We wrapped the HikariDataSource with a Dynamic Proxy and if we run the program we should see the following output.

Called before actual method
Should be printed after the proxy

If we break it down, we created a proxy with the DataSource interface. By creating the proxy we provided an invocation handler which shall print a message before the getConnection method is invoked. The getConnection will be invoked by the actual implementation of the DataSource interface that we specified on the InvocationHandler.

You can find the source code on GitHub.

Add ZipKin to your Spring application

If your application contains multiple services interacting with each other the need for distributed tracing is increasing. You have a call towards one application that also calls another application, in certain cases the application to be accessed next might be a different one. You need to trace the request end to end and identify what happened to the call.
Zipkin is a Distributed Tracing system. Essentially by using Zipkin on our system we can track how a call spans across various Microservices.

ZipKin comes with Various database options. In our case we shall use Elasticsearch.

We will setup out ZipKin server using Compose

Let’s start with our Compose file:

services:
  redis:
    image: redis
    ports:
      - 6379:6379
  elasticsearch:
    image: elasticsearch:7.17.7
    ports:
      - 9200:9200
      - 9300:9300
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9200/_cat/health"]
      interval: 20s
      timeout: 10s
      retries: 5
      start_period: 5s
    environment:
      - discovery.type=single-node
    restart: always
  zipkin:
    image: openzipkin/zipkin-slim
    ports:
      - 9411:9411
    environment:
      - STORAGE_TYPE=elasticsearch
      - ES_HOSTS=http://elasticsearch:9200
      - JAVA_OPTS=-Xms1G -Xmx1G -XX:+ExitOnOutOfMemoryError
    depends_on:
      - elasticsearch
    restart: always

We can run the above using

docker compose up

You can find more on Compose on the Developers Essential Guide to Docker Compose.

Let’s build our applications, our applications will be servlet based

We shall use a service for locations, this service essentially will persist locations on a Redis database using the GeoHash data structure.
You can find the service implementation on a previous blog.

We would like to add some extra dependencies so that Zipkin integration is possible.

...
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2021.0.5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
...
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-zipkin</artifactId>
        </dependency>
...

Spring Sleuth provides distributed tracing to our Spring application. By using spring Sleuth tracing data are generated. In case of a servlet filter or a rest template tracing data will also be generated. Provided the Zipkin binary is included the data generated will be dispatched to the Zipkin collector specified using
spring.zipkin.baseUrl.

Let’s also make our entry point application. This application will execute requests towards the location service we implemented previously.
The dependencies will be the following.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <groupId>org.example</groupId>
    <version>1.0-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>european-venue</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2021.0.5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-zipkin</artifactId>
        </dependency>
    </dependencies>

</project>

We shall create a service interacting with the location service.

The location model shall be the same:

package org.landing;

import lombok.Data;

@Data
public class Location {

    private String name;
    private Double lat;
    private Double lng;

}

And the service:

package org.landing;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;


@Service
public class LocationService {

    @Autowired
    private RestTemplate restTemplate;

    @Value("${location.endpoint}")
    private String locationEndpoint;

    public void checkIn(Location location) {
        restTemplate.postForLocation(locationEndpoint, location);
    }

}

Following we will add the controller:

package org.landing;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import lombok.AllArgsConstructor;

@RestController
@AllArgsConstructor
public class CheckinController {

    private final LocationService locationService;

    @PostMapping("/checkIn")
    public ResponseEntity<String> checkIn(@RequestBody Location location) {
        locationService.checkIn(location);
        return ResponseEntity.ok("Success");
    }

}

We need also RestTemplate to be configured:

package org.landing;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

@Configuration
public class RestTemplateConfiguration {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}

Last but not least the main method:

package org.location;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LocationApplication {

    public static void main(String[] args) {
        SpringApplication.run(LocationApplication.class);
    }

}

Now that everything is up and running let’s put this into action

curl --location --request POST 'localhost:8081/checkIn/' \
--header 'Content-Type: application/json' \
--data-raw '{
	"name":"Liverpool Street",
	"lat": 51.517336,
	"lng": -0.082966
}'
> Success

Let’s navigate now to the Zipkin Dashboard and search traces for the european-venue.
By expanding the calls we shall end up to a call like this.

Essentially we have an end to end tracing for our applications. We can see the entry point which is the european-venue checkin endpoint.
Also because the location service is called we have data points for the call received. However we also see data points for the call towards the redis database.
Essentially by adding sleuth to our application, beans that are ingress and egress points are being wrapped so that trace data can be reported.

You can find the code on GitHub.

Use Redis GeoHash with Spring boot

One very handy Data Structure when it comes to Redis is the GeoHash Data structure. Essentially it is a sorted set that generates a score based on the longitude and latitude.

We will spin up a Redis database using Compose

services:
  redis:
    image: redis
    ports:
      - 6379:6379

Can be run like this

docker compose up

You can find more on Compose on the Developers Essential Guide to Docker Compose.

Let’s add our dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <groupId>org.example</groupId>
    <version>1.0-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>location-service</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.7.5</version>
        </dependency>
    </dependencies>
</project>

We shall start with our Configuration. For convenience on injecting we shall create a GeoOperations<String,String> bean.

package org.location;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.GeoOperations;
import org.springframework.data.redis.core.RedisTemplate;

@Configuration
public class RedisConfiguration {

    @Bean
    public GeoOperations<String,String> geoOperations(RedisTemplate<String,String> template) {
        return template.opsForGeo();
    }

}

Our model would be this one

package org.location;

import lombok.Data;

@Data
public class Location {

    private String name;
    private Double lat;
    private Double lng;

}

This simple service will persist venue locations and also fetch venues nearby of a location.

package org.location;

import java.util.List;
import java.util.stream.Collectors;

import org.springframework.data.geo.Circle;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metrics;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.core.GeoOperations;
import org.springframework.data.redis.domain.geo.GeoLocation;
import org.springframework.stereotype.Service;

@Service
public class GeoService {

    public static final String VENUS_VISITED = "venues_visited";
    private final GeoOperations<String, String> geoOperations;

    public GeoService(GeoOperations<String, String> geoOperations) {
        this.geoOperations = geoOperations;
    }

    public void add(Location location) {
        Point point = new Point(location.getLng(), location.getLat());
        geoOperations.add(VENUS_VISITED, point, location.getName());

    }

    public List<String> nearByVenues(Double lng, Double lat, Double kmDistance) {
        Circle circle = new Circle(new Point(lng, lat), new Distance(kmDistance, Metrics.KILOMETERS));
        GeoResults<RedisGeoCommands.GeoLocation<String>> res = geoOperations.radius(VENUS_VISITED, circle);
        return res.getContent().stream()
                  .map(GeoResult::getContent)
                  .map(GeoLocation::getName)
                  .collect(Collectors.toList());
    }

}

We shall also add a controller

package org.location;

import java.util.List;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class LocationController {

    private final GeoService geoService;

    public LocationController(GeoService geoService) {
        this.geoService = geoService;
    }

    @PostMapping("/location")
    public ResponseEntity<String> addLocation(@RequestBody Location location) {
        geoService.add(location);
        return ResponseEntity.ok("Success");
    }

    @GetMapping("/location/nearby")
    public ResponseEntity<List<String>> locations(Double lng, Double lat, Double km) {
        List<String> locations = geoService.nearByVenues(lng, lat, km);
        return ResponseEntity.ok(locations);
    }

}

Then let’s add an element.

curl --location --request POST 'localhost:8080/location' \
--header 'Content-Type: application/json' \
--data-raw '{
	"lng": 51.5187516,
	"lat":-0.0814374,
	"name": "liverpool-street"
}'

Let’s retrieve the element of the api

curl --location --request GET 'localhost:8080/location/nearby?lng=51.4595573&lat=0.24949&km=100'
> [
    "liverpool-street"
]

And also let’s check redis

ZRANGE venues_visited 0 -1 WITHSCORES
1) "liverpool-street"
2) "2770072452773375"

We did it, pretty convenient for our day to day distance use cases.

Use JMH for your Java applications with Gradle

If you want to benchmark you code, the Java Microbenchmark Harness is the tool of choice.
In our example we shall use the refill-rate-limiter project

 

Since refill-rate-limiter uses Gradle we will use the following plugin for gradle

plugins {
...
  id "me.champeau.gradle.jmh" version "0.5.3"
...
}

We shall place the Benchmark at the jmh/java/io/github/resilience4j/ratelimiter folder.

Our Benchmark should look like this.

package io.github.resilience4j.ratelimiter;

import io.github.resilience4j.ratelimiter.internal.RefillRateLimiter;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.profile.GCProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.All)
public class RateLimiterBenchmark {

    private static final int FORK_COUNT = 2;
    private static final int WARMUP_COUNT = 10;
    private static final int ITERATION_COUNT = 10;
    private static final int THREAD_COUNT = 2;

    private RefillRateLimiter refillRateLimiter;

    private Supplier<String> refillGuardedSupplier;

    public static void main(String[] args) throws RunnerException {
        Options options = new OptionsBuilder()
                .addProfiler(GCProfiler.class)
                .build();
        new Runner(options).run();
    }

    @Setup
    public void setUp() {

        RefillRateLimiterConfig refillRateLimiterConfig = RefillRateLimiterConfig.custom()
                                                                                 .limitForPeriod(1)
                                                                                 .limitRefreshPeriod(Duration.ofNanos(1))
                                                                                 .timeoutDuration(Duration.ofSeconds(5))
                                                                                 .build();

        refillRateLimiter = new RefillRateLimiter("refillBased", refillRateLimiterConfig);

        Supplier<String> stringSupplier = () -> {
            Blackhole.consumeCPU(1);
            return "Hello Benchmark";
        };

        refillGuardedSupplier = RateLimiter.decorateSupplier(refillRateLimiter, stringSupplier);
    }

    @Benchmark
    @Threads(value = THREAD_COUNT)
    @Warmup(iterations = WARMUP_COUNT)
    @Fork(value = FORK_COUNT)
    @Measurement(iterations = ITERATION_COUNT)
    public String refillPermission() {
        return refillGuardedSupplier.get();
    }

}

Let’s now check the elements one by one.

By using Benchmark scope all the threads used on the benchmark scope will share the same object. We do so because we want to test how refill-rate-limiter performs in a multithreaded scenario.

@State(Scope.Benchmark)

We would like our results to be reported in microseconds, therefore we shall use the OutputTimeUnit.

@OutputTimeUnit(TimeUnit.MICROSECONDS)

On JMH We have various benchmark modes depending on what we want to measure.

Throughput is when we want to measure the number operations per unit of time.
AverageTime when we want to measure the average time per operation.
SampleTime when we want to sample the time for each operation including min, max time, more than just the average.
SingleShotTime: when we want to measure the time for a single operation. This can help when we want to identify how the operation will do on a cold start.

We also have the option to measure all the above.

@BenchmarkMode(Mode.All)

Those options configured on the class level will apply to the benchmark methods we shall add.

Let’s also examine how the benchmark will run

We will specify the number of Threads by using the Threads annotation.

@Threads(value = THREAD_COUNT)

Also we want to warm up before we run the actual benchmarks. This way our code will be initialized, online optimizations will take place, and our runtime will adapt to the conditions before we run the benchmarks.

@Warmup(iterations = WARMUP_COUNT)

Using a Fork we shall instruct how many times the benchmark will run.

@Fork(value = FORK_COUNT)

Then we need to specify the number of iterations we want to measure/

@Measurement(iterations = ITERATION_COUNT)

We can start our test by just using

gradle jmh

The results will be save in a file.

...
2022-10-28T09:08:44.522+0100 [QUIET] [system.out] Benchmark result is saved to /path/refill-rate-limiter/build/reports/jmh/results.txt
..

Let’s examine the results.

Benchmark                                                         Mode       Cnt      Score   Error   Units
RateLimiterBenchmark.refillPermission                            thrpt        20     13.594 ± 0.217  ops/us
RateLimiterBenchmark.refillPermission                             avgt        20      0.147 ± 0.002   us/op
RateLimiterBenchmark.refillPermission                           sample  10754462      0.711 ± 0.025   us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.00    sample                  ≈ 0           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.50    sample                0.084           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.90    sample                0.125           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.95    sample                0.125           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.99    sample                0.209           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.999   sample              139.008           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p0.9999  sample              935.936           us/op
RateLimiterBenchmark.refillPermission:refillPermission·p1.00    sample            20709.376           us/op
RateLimiterBenchmark.refillPermission                               ss        20     14.700 ± 4.003   us/op

As we can see we have the modes listed.
Count is the number of iterations. Apart from throughput where we measure the operations by time, the rest is time per operation.
Throughput,Average and Single shot are straightforward, Sample lists the percentiles. Error is the margin of error.

That’s it! Happy benchmarking