Java Concurrency: LockSupport

Previously we had a look on the lock class and how it works behind the scenes.
On this post we shall look at LockSupport and its native methods which are one of the building block for locks and synchronization classes.

As we can see the methods on lock support are static and behind the scenes they are based on on the Unsafe class.
The Unsafe class is not intended to be used by your code directly  thus only trusted code can obtain instances of it.

If we see the methods available we can identify the park method, the unpark methods and the get Blocker method.

public class LockSupport {
...

    public static void park() 
    public static void park(Object blocker) 
    public static void parkNanos(long nanos) 
    public static void parkNanos(Object blocker, long nanos) 
    public static void parkUntil(long deadline) 
    public static void parkUntil(Object blocker, long deadline) 
    public static Object getBlocker(Thread t) 
    public static void unpark(Thread thread) 
...
}

 

Each thread that uses the class LockSupport is associated with a permit.
When we call park the thread is disabled for thread scheduling purposes. Provided the permit is available the permit is consumed and the call returns immediately.

public final class Unsafe {
...
    public native void unpark(Object thread);
    public native void park(boolean isAbsolute, long time);
...
}

We can trace down the permit in the source code implementation and eventually end up on the posix implementation:

void Parker::park(bool isAbsolute, jlong time) {


  if (Atomic::xchg(&_counter, 0) > 0) return;
  JavaThread *jt = JavaThread::current();

  if (jt->is_interrupted(false)) {
    return;
  }

  struct timespec absTime;
  if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
    return;
  }
  if (time > 0) {
    to_abstime(&absTime, time, isAbsolute, false);
  }


  ThreadBlockInVM tbivm(jt);

  if (pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert_status(status == 0, status, "invariant");
    OrderAccess::fence();
    return;
  }

  OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait(&_cond[_cur_index], _mutex);
    assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT),
                  status, "cond_wait");
  }
  else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
    assert_status(status == 0 || status == ETIMEDOUT,
                  status, "cond_timedwait");
  }
  _cur_index = -1;

  _counter = 0;
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "invariant");
  OrderAccess::fence();
}

In a nutsel _counter is a permit and if the permit is bigger than 0 it will consumed and the method will return immediately. If an interrupt is pending the method should return immediately. If nanos to wait have been supplied then the time to wait on the condition is calculated. By using the mutex provided either the thread will wait forever until is unparked or interuped or the thread will wait for the nanos provided.

Let’s see an example of parking a thread:

    @Test
    void park() throws InterruptedException {
        Thread secondaryThread = new Thread(LockSupport::park);

        secondaryThread.start();

        secondaryThread.join(2000);
        log.info("Could not join thread is parked");

        assertTrue(secondaryThread.isAlive());
        LockSupport.unpark(secondaryThread);
        secondaryThread.join();

        assertFalse(secondaryThread.isAlive());
        log.info("Thread was unparked");
    }

We start the thread and park it. When we try to wait for the main thread we use a time limit, if time limit was not set we would wait forever. Eventually the time passes and before retrying to join the thread we unpark. As expected the thread is unparked and eventually finishes.

If the permit is not available the thread lies dormant disabled for thread scheduling. Initially the permit is zero. We can try with one thread.

    @Test
    void unParkAndPark() {
        final Thread mainThread = Thread.currentThread();
        LockSupport.unpark(mainThread);
        LockSupport.park();
    }

By using unpark initially we made a permit available, thus on park the permit was consumed and we returned immediately.

The other element that we see in the LockSupport class is the blocker.

The Blocker maps to the parkBlocker object in the Thread implementation of java.

public class Thread implements Runnable {
...
    volatile Object parkBlocker;
...
}

The blocker represents the synchronization object responsible for this thread parking.
Thus we can note which object is responsible for the thread being parked, which can help for debugging and monitoring purposes.
The unsafe methods are called to set the parkBlocker value to the Thread instance.

Now that we saw how LockSupport works behinds the scenes we can see the example provided by the Javadoc the FIFOMutex.

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

public class FIFOMutex {

    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters
            = new ConcurrentLinkedQueue<Thread>();

    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);

        // Block while not first in queue or cannot acquire lock
        while (waiters.peek() != current ||
               !locked.compareAndSet(false, true)) {
            LockSupport.park(this);
            if (Thread.interrupted()) // ignore interrupts while waiting
                wasInterrupted = true;
        }

        waiters.remove();
        if (wasInterrupted)          // reassert interrupt status on exit
            current.interrupt();
    }

    public void unlock() {
        locked.set(false);
        LockSupport.unpark(waiters.peek());
    }

}

All threads will be added to the queue. Since this is a FIFO queue, the first thread should mark the locked variable as true and should be able to proceed and remove itself from the queue. The rest of the threads should be parked. In case of an interrupt LockSupport will exit thus we should reassert the interrupt on the thread on exit.
Also pay attention that the blocker is set as the instance of the FIFOMutex class.

By using unlock the threads that have been in a parked state will resume.

We can also check another usage of lock support. Rate limiters can benefit from LockSupport.
We can check the source code of the Refill Rate Limiter:

private boolean waitForPermission(final long nanosToWait) {
        waitingThreads.incrementAndGet();
        long deadline = currentNanoTime() + nanosToWait;
        boolean wasInterrupted = false;
        while (currentNanoTime() < deadline && !wasInterrupted) {
            long sleepBlockDuration = deadline - currentNanoTime();
            parkNanos(sleepBlockDuration);
            wasInterrupted = Thread.interrupted();
        }
        waitingThreads.decrementAndGet();
        if (wasInterrupted) {
            currentThread().interrupt();
        }
        return !wasInterrupted;
    }

In a Rate Limiter the permits to acquire are limited over time. In case of a thread trying to acquire a permit there is an option to wait until a permit is available. In this case we can park the thread. This way our rate limiter will prevent the time busy on spinning.

That’s it. Now that we know about lock support we can proceed on more interesting concurrency concepts.

Advertisement

Java Concurrency: Object monitor methods

Previously we used monitor locks to solve a problem in a thread safe way. On Java each object has a lock associated with it. Among with this monitor lock the object has some methods that depend on it: wait, notify, notifyAll. Those methods can help on coordinating threads. For example a thread writing to a variable and notifying another thread on that variable change.

By using the wait method the thread will wait until notified or interrupted. In order to call this method from an object the thread should have acquired the lock associated with the object. Behind the scenes the thread will get into a wait queue among other threads that wait. That queue is associated with the object that we use its monitor lock.

We can see that on the objectMonitor sourcecode.

  // create a node to be put into the queue
  // Critically, after we reset() the event but prior to park(), we must check
  // for a pending interrupt.
  ObjectWaiter node(current);
  node.TState = ObjectWaiter::TS_WAIT;
  current->_ParkEvent->reset();
  OrderAccess::fence();          

Once the thread gets into a wait state, it will relinquish the lock it acquired from the object. The thread will be dormant until it is interrupted or it gets notified to wake up.

We shall use an object as a lock and provide a runnable implementation:

    private Object object = new Object();

    public class WaitingRunnable implements Runnable {

        @Override
        public void run() {
            try {
                log.info("Waiting to be notified");
                synchronized (object) {
                    object.wait();
                    log.info("Notified!");
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

If we try to call wait on the Object without acquiring the monitor lock a failure will occur.

    @Test
    void illegalMonitorState() {
        assertThrows(IllegalMonitorStateException.class, () -> object.wait());
    }

Instead we will be successful if we acquire the lock.

    @Test
    void testNotifyWait() throws InterruptedException {
        Thread waitingThread = new Thread(new WaitingRunnable());
        waitingThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
            Thread.sleep(1000);
            log.info("Not released yet");
        }

        waitingThread.join();
    }

We can see the following output

21:40:23.606 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
21:40:24.111 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
21:40:25.117 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Not released yet
21:40:25.121 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

Let’s take this into one step further. Once the thread is awaken it will try to acquire the lock again. The lock will not be released until the thread that called notify will exit.
We can see that in the following example:

    @Test
    void testNotifyWait() throws InterruptedException {
        Thread waitingThread = new Thread(new WaitingRunnable());
        waitingThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
            log.info("Should wait");
            Thread.sleep(1000);
        }

        waitingThread.join();
    }

The output would be the following:

21:38:38.431 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
21:38:38.936 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
21:38:38.936 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Should wait
21:38:39.942 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

We have the notify method and the notifyAll method. In case of notify will pick one thread from the waiting queue and notify it. In case of notifyAll all threads from the queue will be notified.

For example:

    @Test
    void testNotifyOnce() throws InterruptedException {
        Thread firstThread = new Thread(new WaitingRunnable());
        Thread secondThread = new Thread(new WaitingRunnable());

        firstThread.start();
        secondThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notify();
        }

        firstThread.join();
        secondThread.join(2000);
    }

Only one thread will be notified which we can see from the output

07:52:28.095 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:52:28.095 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:52:28.600 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
07:52:28.601 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

Now if we use notify all, all waiting threads will be notified :

    @Test
    void testNotifyAll() throws InterruptedException {
        Thread firstThread = new Thread(new WaitingRunnable());
        Thread secondThread = new Thread(new WaitingRunnable());

        firstThread.start();
        secondThread.start();

        Thread.sleep(500l);
        synchronized (object) {
            log.info("Lock acquired");
            object.notifyAll();
        }

        firstThread.join();
        secondThread.join();
    }

07:54:28.712 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:54:28.712 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Waiting to be notified
07:54:29.211 [main] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Lock acquired
07:54:29.211 [Thread-1] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!
07:54:29.212 [Thread-0] INFO com.gkatzioura.concurrency.monitor.MonitorTest - Notified!

That’s it! We went one step further into coordinating threads by using the monitor method provided by the Object class.

Java Concurrency: The Lock interface

Previously we implemented a thread safe counter using synchronized. We would like to swift from synchronized blocks to something more flexible with more features, this is were locks are of use. On this blog we will focus on Java’s Lock interface.

 

 

The Lock interface supports three forms of lock acquisition interruptible, non-interruptible, and timed.

We can acquire locks between threads. A thread will wait until the lock is released from the thread holding the lock:

    @Test
    void lock() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            log.info("Acquired delayed");
            lock.unlock();
        });

        lock.lock();
        withDelayedLock.start();

        Thread.sleep(500);
        log.info("Will release");
        lock.unlock();
        withDelayedLock.join();
    }

Since the threads blocks on the lock we might as well interrupt the thread. In this case we can lock lockInterruptibly, and apply any logic in case of an Interrupt:

    @Test
    void lockInterruptibly() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                log.error("interrupted while waiting",e);
            }
        });

        lock.lock();
        withDelayedLock.start();
        withDelayedLock.interrupt();
        lock.unlock();
        withDelayedLock.join();
    }

A thread can also try to acquire a lock which is already acquired, and exit immediately instead of blocking.

    @Test
    void tryLock() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            boolean locked = lock.tryLock();
            assertFalse(locked);
        });

        lock.lock();
        withDelayedLock.start();
        Thread.sleep(500);
        lock.unlock();
        withDelayedLock.join();
    }

Also a time period can be specified until the lock is acquired.

    @Test
    void tryLockTime() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            try {
                boolean locked = lock.tryLock(100, TimeUnit.MILLISECONDS);
                assertFalse(locked);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        lock.lock();
        withDelayedLock.start();
        Thread.sleep(500);
        lock.unlock();
        withDelayedLock.join();
    }

Another thing of importance with locks is memory.

From the documentation

All {@code Lock} implementations <em>must</em> enforce the same
* memory synchronization semantics as provided by the built-in monitor
* lock, as described in
* Chapter 17 of
* <cite>The Java Language Specification</cite>:
*
<ul>*
 	<li>A successful {@code lock} operation has the same memory
* synchronization effects as a successful <em>Lock</em> action.
*</li>
 	<li>A successful {@code unlock} operation has the same
* memory synchronization effects as a successful <em>Unlock</em> action.
*</li>
</ul>

In order for the results to be flushed on the main memory we need to use lock and unlock.
Take the following example

    private Lock lock = new ReentrantLock();
    private String threadAcquired = "main";

    @Test
    void wrongMemoryVisibility() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            try {
                threadAcquired = "delayed";
                System.out.println("Acquired on delayed");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }

        });

        lock.lock();
        try {
            withDelayedLock.start();
            Thread.sleep(500);
        } finally {
            lock.unlock();
        }

        while (true) {
            System.out.println("Currently acquired " + threadAcquired);
            if (threadAcquired.equals("delayed")) {
                break;
            }
        }

        withDelayedLock.join();
        threadAcquired = "main";
    }

We print the variable threadAcquired by the main thread while it is changed by the thread with a Delayed lock.
After a few runs a situation where the threadAcquired variable has a stale value on the main thread will appear.

Acquired on delayed
Currently acquired main

We did this on purpose, on a real world problem we should not access variables like threadAcquired without proper synchronisation, for example we should have acquired the lock first, this way the memory would be synchronised and we would have an up to date value.

    @Test
    void correctMemoryVisibility() throws InterruptedException {
        Thread withDelayedLock = new Thread(() -> {
            lock.lock();
            try {
                threadAcquired = "delayed";
                System.out.println("Acquired on delayed");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }

        });

        lock.lock();
        try {
            withDelayedLock.start();
            Thread.sleep(500);
        } finally {
            lock.unlock();
        }

        while (true) {
            lock.lock();
            try {
                System.out.println("Currently acquired " + threadAcquired);
                if (threadAcquired.equals("delayed")) {
                    break;
                }
            } finally {
                lock.unlock();
            }
        }

        withDelayedLock.join();
    }

That’s all for now, we will later proceed on different type of locks and the Condition interface.