Java Concurrency: ReadWriteLock

Imagine a scenario where we want to distinguish write threads and read threads.

  • As we execute a write we want isolation
  • As we execute a read we don’t want data to change
  • Read operations does not block other read operations

From a writers perspective we want to achieve isolation and consistency. From a readers perspective we want to be able to read results that are consistent, we want to take advantage of the java memory model and we don’t want to prevent other readers from reading too.
The ReadWriteLock can fulfil the above.

A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.
All ReadWriteLock implementations must guarantee that the memory synchronization effects of writeLock operations (as specified in the Lock interface) also hold with respect to the associated readLock. That is, a thread successfully acquiring the read lock will see all updates made upon previous release of the write lock.

Let’s examine the interface

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

In both cases we receive back the lock interface which we saw previously.

We have two implementations of a ReadWriteLock the ReentrantReadWriteLock and ReadWriteLockView, we will examine them further.
For now we shall use the ReentrantReadWriteLock implementation.

Let’s see some usage scenarios using Threads and Runables.

We have the write and read lock acquisition scenarios:

    private int counter = 0;

    @Test
    void testWrite() {
        Lock lock = readWriteLock.writeLock();

        lock.lock();

        try {
            counter++;
        } finally {
            lock.unlock();;
        }
    }

    @Test
    void testRead() {
        Lock lock = readWriteLock.readLock();

        lock.lock();

        try {
            log.info("Counter is {}", counter);
        } finally {
            lock.unlock();;
        }
    }

The next scenario is read first and then try to perform a write:

    private int counter = 0;

    @Test
    void testReadThenWrite() throws InterruptedException {
        Thread readThread = new Thread(() -> {
            int readValue;
            Lock lock = readWriteLock.readLock();

            lock.lock();

            try {
                readValue = counter;
                log.info("Read successfully [{}]",readValue);
                sleep();
            } finally {
                lock.unlock();;
            }
        });

        Thread writeThread = new Thread(() -> {
            Lock lock = readWriteLock.writeLock();
            lock.lock();

            try {
                counter++;
                log.info("wrote successfully [{}]",counter);
            } finally {
                lock.unlock();;
            }

        });

        readThread.start();
        Thread.sleep(1000L);
        writeThread.start();
        writeThread.join();
        readThread.join();
    }

    private static void sleep() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

In this case we can see that the write operation waited for the read operation to finish.

08:40:31.686 [Thread-0] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - Read successfully [0]
08:40:41.693 [Thread-1] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - wrote successfully [1]

The other scenario is write first and then try to perform a read:

    private int counter = 0;

    @Test
    void testWriteThenRead() throws InterruptedException {
        Thread readThread = new Thread(() -> {
            int readValue;
            Lock lock = readWriteLock.readLock();

            lock.lock();

            try {
                readValue = counter;
                log.info("Read successfully [{}]",readValue);
            } finally {
                lock.unlock();;
            }
        });

        Thread writeThread = new Thread(() -> {
            Lock lock = readWriteLock.writeLock();
            lock.lock();

            try {
                counter++;
                log.info("wrote successfully [{}]",counter);
                sleep();
            } finally {
                lock.unlock();;
            }

        });

        writeThread.start();
        Thread.sleep(1000L);
        readThread.start();

        writeThread.join();
        readThread.join();
    }

    private static void sleep() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

As expected the read operation waited for the write operation to finish:

08:42:21.646 [Thread-1] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - wrote successfully [1]
08:42:31.652 [Thread-0] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - Read successfully [1]

The last scenario will perform multiple reads. ReadWriteLock has the feature where the read lock can be held by multiple readers.

    @Test
    void testMultipleReads() throws InterruptedException {
        Thread readThread1 = new Thread(() -> {
            int readValue;
            Lock lock = readWriteLock.readLock();

            lock.lock();

            try {
                readValue = counter;
                log.info("Read successfully [{}]",readValue);
                sleep();
            } finally {
                lock.unlock();;
            }
        });
        readThread1.setName("r-1");

        Thread readThread2 = new Thread(() -> {
            int readValue;
            Lock lock = readWriteLock.readLock();

            lock.lock();

            try {
                readValue = counter;
                log.info("Read successfully [{}]",readValue);
            } finally {
                lock.unlock();;
            }

        });
        readThread2.setName("r-2");

        readThread1.start();
        Thread.sleep(1000L);
        readThread2.start();

        readThread1.join();
        readThread2.join();
    }

In both case the response was immediate since there was no blocking at all.

08:46:05.018 [r-1] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - Read successfully [0]
08:46:06.022 [r-2] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - Read successfully [0]

From the above scenarios we can get an idea where ReentRantLock can be useful. The predominant scenario would be when we have primarily read operations and get advantage of multiple readers getting a hold on the Read lock. On another blog we will do some benchmarks and see its usage further.

Java Concurrency: Condition

Previously we checked on ReentRantLock and its fairness. One of the things we can stumble upon is the creation of a Condition. By using Condition we can create mechanisms that allow threads to wait for specific conditions to be met before proceeding with their execution.

 

The condition provides the following methods:


public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}

The closest we came to that so far is the wait Object Monitor method.
A Condition is bound to a Lock and a thread cannot interact with a condition and its methods if it does not have a hold on that lock.
Also Condition uses the underlying lock mechanisms, for example signal and signalAll will use the underlying Queue of the threads that is maintained by the Lock and will notify them to wake up.

One of the obvious things to implement using Conditions is a BlockingQueue. Worker threads processing data and publisher threads dispatching data. Data are published on a queue, worker threads will process data from the queue and then they should wait if there is no data in the queue.

For a worker thread, if the condition is met the flow is the following:

  • Acquire the lock
  • Check the condition
  • Process Data
  • Release the lock

If the condition is not met, the flow would slightly change to this:

  • Acquire the lock
  • Check the condition
  • Wait until the condition is met.
  • Re-acquire the lock
  • Process Data
  • Release the lock

The publisher thread whenever it adds a message it should notify the threads waiting on the condition.

The workflow would be like this.

  • Acquire the lock
  • Publish data
  • Notify the workers
  • Release the lock

 

Obviously this functionality already exists through the BlockingQueue interface and the LinkedBlockingDeque and ArrayBlockingQueue implementation.
We will proceed with an implementation for the shake of the example.

Let’s see the Message Queue:

package com.gkatzioura.concurrency.lock.condition;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MessageQueue<T> {

    private Queue<T> queue = new LinkedList<>();
    private Lock lock = new ReentrantLock();
    private Condition hasMessages = lock.newCondition();

    public void publish(T message) {
        lock.lock();
        try {
            queue.offer(message);
            hasMessages.signal(); 
        } finally {
            lock.unlock();
        }
    }

    public T receive() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                hasMessages.await();
            }
            return queue.poll();
        } finally {
            lock.unlock();
        }
    }

}

Now let’s put it into action:

        MessageQueue<String> messageQueue = new MessageQueue<>();

    @Test
    void testPublish() throws InterruptedException {
        Thread publisher = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                String message = "Sending message num: " + i;
                log.info("Sending [{}]", message);
                messageQueue.publish(message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread worker1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    String message = messageQueue.receive();
                    log.info("Received: [{}]", message);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread worker2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    String message = messageQueue.receive();
                    log.info("Received: [{}]", message);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        publisher.start();
        worker1.start();
        worker2.start();

        publisher.join();
        worker1.join();
        worker2.join();
    }

That’s it! Our workers processed the expected messages and waited when the queue was empty.

Java Concurrency: ReentRantLock Fairness

Previously we saw some of the building blocks of concurrency in Java. In this blog we will focus on ReentRantLock. ReentRantLock is based on the AbstractQueuedSynchronizer.

By using the ReentRantLock we allow a thread to acquire a lock and use it multiple times. As the name suggests is provides us with Reentrancy just like the synchronized blocks.
Because ReentRantLock is implemented using the AbstractQueuedSynchronizer we can have fairness. The threads that try to acquire the lock will wait in a FIFO fashion and the algorithm will try to ensure fairness.

If we check our previous blog on locks we can see that locks are featureful.

If a thread holds the lock we cannot unlock it, only that thread is able to unlock it:

    @Test
    void alreadyAcquired() throws Exception {
        final ReentrantLock reentrantLock = new ReentrantLock();

        Thread thread = new Thread(() -> {
            log.info("Try first time");
            reentrantLock.lock();
            log.info("Got the lock");
        });

        thread.start();
        thread.join();
        assertThrows(IllegalMonitorStateException.class, reentrantLock::unlock);
    }

Also as the name suggests let’s see the reentrancy. In the following example the same thread will try to get the lock it holds multiple times.

@Test
void reentrancy() throws Exception {
final ReentrantLock reentrantLock = new ReentrantLock();

Thread thread = new Thread(() -&gt; {
log.info("Try first time");
reentrantLock.lock();
log.info("Got the lock");

log.info("lock one more time");
reentrantLock.lock();
log.info("locked again");
reentrantLock.unlock();
});

thread.start();
thread.join();
}

In the next example we will construct a ReentRantLock with fairness enabled. Let’s do a small test on ReentRantLock’s of fairness.
5 threads will try to acquire the lock simultaneously. Then we will see how they achieved so and their order on acquiring the lock.

    @AllArgsConstructor
    private class CheckFairnessRunnable implements Runnable {

        private final int sleepTime;
        private final Lock lock;

        @Override
        public void run() {
            try {
                Thread.sleep(sleepTime);
                log.info("acquiring lock");
                lock.lock();
                log.info("lock acquired");
                Thread.sleep(5000);
                lock.unlock();
                log.info("lock unlocked");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    void fairness() throws Exception {
        int numberOfThreads = 5;
        ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

        final ReentrantLock reentrantLock = new ReentrantLock(true);

        for (int i = 0; i < numberOfThreads; i++) {
            Runnable runnable = new CheckFairnessRunnable(i*100, reentrantLock);
            executor.execute(runnable);
        }

        executor.shutdown();
        executor.awaitTermination(60, TimeUnit.SECONDS);
    }

If we examine the code and the CheckFairnessRunnable the thread initially sleeps. This is done on purpose so threads don’t arrive at the lock at the same time. The time a thread spends when it acquires the lock is high on purpose so we make sure all threads will wait for the lock.

Let’s check the output:

09:07:43.782 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:43.788 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:43.882 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:43.985 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:44.085 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:44.185 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:48.797 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:48.796 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:07:53.802 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:53.802 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:07:58.807 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:58.806 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:08:03.813 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:08:03.813 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:08:08.819 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked

We can see that the threads that were first to try to acquire the lock were also the ones that got the lock, so essentially the order was preserved.

That’s it on the next blog we will check the Condition interface.

Java Concurrency: The AbstractQueuedSynchronizer

Previously we worked with one of the building blocks of Java concurrency the LockSupport class. On this blog we will have a look at the AbstractQueuedSynchronizer.
AbstractQueuedSynchronizer is one of the building blocks of Java synchronization. Lock implementations such as ReentrantLock and Semaphore are based on the AbstractQueuedSynchronizer.
An integer represents the state. Based on the use case what a state represent can vary for example it can represent how many permits are available. Operations that change the state are guarded by a FIFO queue and tries to do so in a fair manner, and provided access to the lock in the order which the thread arrived.

 

The FIFO queue is implemented using a nested Node class is. The Node represents the Thread waiting in the queue. The operations are atomic and the variables are volatile. It contains references to the previous and the next node.

    abstract static class Node {
        volatile Node prev;       // initially attached via casTail
        volatile Node next;       // visibly nonnull when signallable
        Thread waiter;            // visibly nonnull when enqueued
        volatile int status;      // written by owner, atomic bit ops by others

...
    }

An instance of the AbstractQueuedSynchronizer, since it provides a FIFO, will have a reference of the head and tail in the class:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...
    private transient volatile Node head;
    private transient volatile Node tail;
...
}

As mentioned we have the state variable¨

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...
    private volatile int state;
...
}

Τhe state variable can be used to represent the resources available or allocated.

When it comes to ReentrantLock it represent the number of holds on the lock:

public class ReentrantLock implements Lock, java.io.Serializable {
...
    static final class FairSync extends Sync {
...
        final boolean initialTryLock() {
...
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
...
    }
...
}

When it comes to the Semaphore it represents the permits:

public class Semaphore implements java.io.Serializable {
...
    static final class FairSync extends Sync {
...
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
...
   }
...
}

Since the AbstractQueuedSynchronizer supports a shared mode as well as an exclusive mode, there are two Node implementations to represent it.

    static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

For example ReentrantLock is on an Exclusive mode while Semaphore is on a shared mode.

The acquire method is where the logic lies.

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

The implementation of acquiring the lock can vary from jdk implementations but the objectives should be the same.
The first step is the thread to try and acquire the lock if it is available without blocking.
This can vary on the Shared on Exclusive mode.
If the lock is held the Thread is blocked and aded to the waiting queue.

    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
...
        }
    }

As the lock gets released by the thread holding it the threads in the queue are signaled to proceed and acquire the lock.
The mechanism varies whether shared or exclusive.

Now that we know more on the AbstractQueuedSynchronizer we can check its implementations and find more about its usage.

Java Concurrency: LockSupport

Previously we had a look on the lock class and how it works behind the scenes.
On this post we shall look at LockSupport and its native methods which are one of the building block for locks and synchronization classes.

As we can see the methods on lock support are static and behind the scenes they are based on on the Unsafe class.
The Unsafe class is not intended to be used by your code directly  thus only trusted code can obtain instances of it.

If we see the methods available we can identify the park method, the unpark methods and the get Blocker method.

public class LockSupport {
...

    public static void park() 
    public static void park(Object blocker) 
    public static void parkNanos(long nanos) 
    public static void parkNanos(Object blocker, long nanos) 
    public static void parkUntil(long deadline) 
    public static void parkUntil(Object blocker, long deadline) 
    public static Object getBlocker(Thread t) 
    public static void unpark(Thread thread) 
...
}

 

Each thread that uses the class LockSupport is associated with a permit.
When we call park the thread is disabled for thread scheduling purposes. Provided the permit is available the permit is consumed and the call returns immediately.

public final class Unsafe {
...
    public native void unpark(Object thread);
    public native void park(boolean isAbsolute, long time);
...
}

We can trace down the permit in the source code implementation and eventually end up on the posix implementation:

void Parker::park(bool isAbsolute, jlong time) {


  if (Atomic::xchg(&_counter, 0) > 0) return;
  JavaThread *jt = JavaThread::current();

  if (jt->is_interrupted(false)) {
    return;
  }

  struct timespec absTime;
  if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
    return;
  }
  if (time > 0) {
    to_abstime(&absTime, time, isAbsolute, false);
  }


  ThreadBlockInVM tbivm(jt);

  if (pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert_status(status == 0, status, "invariant");
    OrderAccess::fence();
    return;
  }

  OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait(&_cond[_cur_index], _mutex);
    assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT),
                  status, "cond_wait");
  }
  else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
    assert_status(status == 0 || status == ETIMEDOUT,
                  status, "cond_timedwait");
  }
  _cur_index = -1;

  _counter = 0;
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "invariant");
  OrderAccess::fence();
}

In a nutsel _counter is a permit and if the permit is bigger than 0 it will consumed and the method will return immediately. If an interrupt is pending the method should return immediately. If nanos to wait have been supplied then the time to wait on the condition is calculated. By using the mutex provided either the thread will wait forever until is unparked or interuped or the thread will wait for the nanos provided.

Let’s see an example of parking a thread:

    @Test
    void park() throws InterruptedException {
        Thread secondaryThread = new Thread(LockSupport::park);

        secondaryThread.start();

        secondaryThread.join(2000);
        log.info("Could not join thread is parked");

        assertTrue(secondaryThread.isAlive());
        LockSupport.unpark(secondaryThread);
        secondaryThread.join();

        assertFalse(secondaryThread.isAlive());
        log.info("Thread was unparked");
    }

We start the thread and park it. When we try to wait for the main thread we use a time limit, if time limit was not set we would wait forever. Eventually the time passes and before retrying to join the thread we unpark. As expected the thread is unparked and eventually finishes.

If the permit is not available the thread lies dormant disabled for thread scheduling. Initially the permit is zero. We can try with one thread.

    @Test
    void unParkAndPark() {
        final Thread mainThread = Thread.currentThread();
        LockSupport.unpark(mainThread);
        LockSupport.park();
    }

By using unpark initially we made a permit available, thus on park the permit was consumed and we returned immediately.

The other element that we see in the LockSupport class is the blocker.

The Blocker maps to the parkBlocker object in the Thread implementation of java.

public class Thread implements Runnable {
...
    volatile Object parkBlocker;
...
}

The blocker represents the synchronization object responsible for this thread parking.
Thus we can note which object is responsible for the thread being parked, which can help for debugging and monitoring purposes.
The unsafe methods are called to set the parkBlocker value to the Thread instance.

Now that we saw how LockSupport works behinds the scenes we can see the example provided by the Javadoc the FIFOMutex.

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

public class FIFOMutex {

    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters
            = new ConcurrentLinkedQueue<Thread>();

    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);

        // Block while not first in queue or cannot acquire lock
        while (waiters.peek() != current ||
               !locked.compareAndSet(false, true)) {
            LockSupport.park(this);
            if (Thread.interrupted()) // ignore interrupts while waiting
                wasInterrupted = true;
        }

        waiters.remove();
        if (wasInterrupted)          // reassert interrupt status on exit
            current.interrupt();
    }

    public void unlock() {
        locked.set(false);
        LockSupport.unpark(waiters.peek());
    }

}

All threads will be added to the queue. Since this is a FIFO queue, the first thread should mark the locked variable as true and should be able to proceed and remove itself from the queue. The rest of the threads should be parked. In case of an interrupt LockSupport will exit thus we should reassert the interrupt on the thread on exit.
Also pay attention that the blocker is set as the instance of the FIFOMutex class.

By using unlock the threads that have been in a parked state will resume.

We can also check another usage of lock support. Rate limiters can benefit from LockSupport.
We can check the source code of the Refill Rate Limiter:

private boolean waitForPermission(final long nanosToWait) {
        waitingThreads.incrementAndGet();
        long deadline = currentNanoTime() + nanosToWait;
        boolean wasInterrupted = false;
        while (currentNanoTime() < deadline && !wasInterrupted) {
            long sleepBlockDuration = deadline - currentNanoTime();
            parkNanos(sleepBlockDuration);
            wasInterrupted = Thread.interrupted();
        }
        waitingThreads.decrementAndGet();
        if (wasInterrupted) {
            currentThread().interrupt();
        }
        return !wasInterrupted;
    }

In a Rate Limiter the permits to acquire are limited over time. In case of a thread trying to acquire a permit there is an option to wait until a permit is available. In this case we can park the thread. This way our rate limiter will prevent the time busy on spinning.

That’s it. Now that we know about lock support we can proceed on more interesting concurrency concepts.

Java Concurrency: Object monitor methods

Previously we used monitor locks to solve a problem in a thread safe way. On Java each object has a lock associated with it. Among with this monitor lock the object has some methods that depend on it: wait, notify, notifyAll. Those methods can help on coordinating threads. For example a thread writing to a variable and notifying another thread on that variable change.

By using the wait method the thread will wait until notified or interrupted. In order to call this method from an object the thread should have acquired the lock associated with the object. Behind the scenes the thread will get into a wait queue among other threads that wait. That queue is associated with the object that we use its monitor lock.

We can see that on the objectMonitor sourcecode.

  // create a node to be put into the queue
  // Critically, after we reset() the event but prior to park(), we must check
  // for a pending interrupt.
  ObjectWaiter node(current);
  node.TState = ObjectWaiter::TS_WAIT;
  current->_ParkEvent->reset();
  OrderAccess::fence();          

Once the thread gets into a wait state, it will relinquish the lock it acquired from the object. The thread will be dormant until it is interrupted or it gets notified to wake up.

We shall use an object as a lock and provide a runnable implementation:

    private Object object = new Object();

    public class WaitingRunnable implements Runnable {

        @Override
        public void run() {
            try {
                log.info("Waiting to be notified");
                synchronized (object) {
                    object.wait();
                    log.info("Notified!");
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

If we try to call wait on the Object without acquiring the monitor lock a failure will occur.

    @Test
    void illegalMonitorState() {
        assertThrows(IllegalMonitorStateException.class, () -> object.wait());
    }

Instead we will be successful if we acquire the lock.

    @Test
    void testNotifyWait() throws InterruptedException {
        Thread waitingThread = new Thread(new WaitingRunnable());
        waitingThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
            Thread.sleep(1000);
            log.info("Not released yet");
        }

        waitingThread.join();
    }

We can see the following output

21:40:23.606 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
21:40:24.111 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
21:40:25.117 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Not released yet
21:40:25.121 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

Let’s take this into one step further. Once the thread is awaken it will try to acquire the lock again. The lock will not be released until the thread that called notify will exit.
We can see that in the following example:

    @Test
    void testNotifyWait() throws InterruptedException {
        Thread waitingThread = new Thread(new WaitingRunnable());
        waitingThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
            log.info("Should wait");
            Thread.sleep(1000);
        }

        waitingThread.join();
    }

The output would be the following:

21:38:38.431 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
21:38:38.936 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
21:38:38.936 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Should wait
21:38:39.942 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

We have the notify method and the notifyAll method. In case of notify will pick one thread from the waiting queue and notify it. In case of notifyAll all threads from the queue will be notified.

For example:

    @Test
    void testNotifyOnce() throws InterruptedException {
        Thread firstThread = new Thread(new WaitingRunnable());
        Thread secondThread = new Thread(new WaitingRunnable());

        firstThread.start();
        secondThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
        }

        firstThread.join();
        secondThread.join(2000);
    }

Only one thread will be notified which we can see from the output

07:52:28.095 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:52:28.095 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:52:28.600 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
07:52:28.601 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

Now if we use notify all, all waiting threads will be notified :

    @Test
    void testNotifyAll() throws InterruptedException {
        Thread firstThread = new Thread(new WaitingRunnable());
        Thread secondThread = new Thread(new WaitingRunnable());

        firstThread.start();
        secondThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notifyAll();
        }

        firstThread.join();
        secondThread.join();
    }

07:54:28.712 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:54:28.712 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:54:29.211 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
07:54:29.211 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!
07:54:29.212 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

That’s it! We went one step further into coordinating threads by using the monitor method provided by the Object class.

Java Concurrency: The Lock interface

Previously we implemented a thread safe counter using synchronized. We would like to swift from synchronized blocks to something more flexible with more features, this is were locks are of use. On this blog we will focus on Java’s Lock interface.

 

 

The Lock interface supports three forms of lock acquisition interruptible, non-interruptible, and timed.

We can acquire locks between threads. A thread will wait until the lock is released from the thread holding the lock:

    @Test
    void lock() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            log.info("Acquired delayed");
            lock.unlock();
        });

        lock.lock();
        withDelayedLock.start();

        Thread.sleep(500);
        log.info("Will release");
        lock.unlock();
        withDelayedLock.join();
    }

Since the threads blocks on the lock we might as well interrupt the thread. In this case we can lock lockInterruptibly, and apply any logic in case of an Interrupt:

    @Test
    void lockInterruptibly() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                log.error("interrupted while waiting",e);
            }
        });

        lock.lock();
        withDelayedLock.start();
        withDelayedLock.interrupt();
        lock.unlock();
        withDelayedLock.join();
    }

A thread can also try to acquire a lock which is already acquired, and exit immediately instead of blocking.

    @Test
    void tryLock() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            boolean locked = lock.tryLock();
            assertFalse(locked);
        });

        lock.lock();
        withDelayedLock.start();
        Thread.sleep(500);
        lock.unlock();
        withDelayedLock.join();
    }

Also a time period can be specified until the lock is acquired.

    @Test
    void tryLockTime() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            try {
                boolean locked = lock.tryLock(100, TimeUnit.MILLISECONDS);
                assertFalse(locked);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        lock.lock();
        withDelayedLock.start();
        Thread.sleep(500);
        lock.unlock();
        withDelayedLock.join();
    }

Another thing of importance with locks is memory.

From the documentation

All {@code Lock} implementations <em>must</em> enforce the same
* memory synchronization semantics as provided by the built-in monitor
* lock, as described in
* Chapter 17 of
* <cite>The Java Language Specification</cite>:
*
<ul>*
 	<li>A successful {@code lock} operation has the same memory
* synchronization effects as a successful <em>Lock</em> action.
*</li>
 	<li>A successful {@code unlock} operation has the same
* memory synchronization effects as a successful <em>Unlock</em> action.
*</li>
</ul>

In order for the results to be flushed on the main memory we need to use lock and unlock.
Take the following example

    private Lock lock = new ReentrantLock();
    private String threadAcquired = "main";

    @Test
    void wrongMemoryVisibility() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            try {
                threadAcquired = "delayed";
                System.out.println("Acquired on delayed");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }

        });

        lock.lock();
        try {
            withDelayedLock.start();
            Thread.sleep(500);
        } finally {
            lock.unlock();
        }

        while (true) {
            System.out.println("Currently acquired " + threadAcquired);
            if (threadAcquired.equals("delayed")) {
                break;
            }
        }

        withDelayedLock.join();
        threadAcquired = "main";
    }

We print the variable threadAcquired by the main thread while it is changed by the thread with a Delayed lock.
After a few runs a situation where the threadAcquired variable has a stale value on the main thread will appear.

Acquired on delayed
Currently acquired main

We did this on purpose, on a real world problem we should not access variables like threadAcquired without proper synchronisation, for example we should have acquired the lock first, this way the memory would be synchronised and we would have an up to date value.

    @Test
    void correctMemoryVisibility() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            try {
                threadAcquired = "delayed";
                System.out.println("Acquired on delayed");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }

        });

        lock.lock();
        try {
            withDelayedLock.start();
            Thread.sleep(500);
        } finally {
            lock.unlock();
        }

        while (true) {
            lock.lock();
            try {
                System.out.println("Currently acquired " + threadAcquired);
                if (threadAcquired.equals("delayed")) {
                    break;
                }
            } finally {
                lock.unlock();
            }
        }

        withDelayedLock.join();
    }

That’s all for now, we will later proceed on different type of locks and the Condition interface.