Java Concurrency: LockSupport

Previously we had a look on the lock class and how it works behind the scenes.
On this post we shall look at LockSupport and its native methods which are one of the building block for locks and synchronization classes.

As we can see the methods on lock support are static and behind the scenes they are based on on the Unsafe class.
The Unsafe class is not intended to be used by your code directly  thus only trusted code can obtain instances of it.

If we see the methods available we can identify the park method, the unpark methods and the get Blocker method.

public class LockSupport {
...

    public static void park() 
    public static void park(Object blocker) 
    public static void parkNanos(long nanos) 
    public static void parkNanos(Object blocker, long nanos) 
    public static void parkUntil(long deadline) 
    public static void parkUntil(Object blocker, long deadline) 
    public static Object getBlocker(Thread t) 
    public static void unpark(Thread thread) 
...
}

 

Each thread that uses the class LockSupport is associated with a permit.
When we call park the thread is disabled for thread scheduling purposes. Provided the permit is available the permit is consumed and the call returns immediately.

public final class Unsafe {
...
    public native void unpark(Object thread);
    public native void park(boolean isAbsolute, long time);
...
}

We can trace down the permit in the source code implementation and eventually end up on the posix implementation:

void Parker::park(bool isAbsolute, jlong time) {


  if (Atomic::xchg(&_counter, 0) > 0) return;
  JavaThread *jt = JavaThread::current();

  if (jt->is_interrupted(false)) {
    return;
  }

  struct timespec absTime;
  if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
    return;
  }
  if (time > 0) {
    to_abstime(&absTime, time, isAbsolute, false);
  }


  ThreadBlockInVM tbivm(jt);

  if (pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert_status(status == 0, status, "invariant");
    OrderAccess::fence();
    return;
  }

  OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait(&_cond[_cur_index], _mutex);
    assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT),
                  status, "cond_wait");
  }
  else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
    assert_status(status == 0 || status == ETIMEDOUT,
                  status, "cond_timedwait");
  }
  _cur_index = -1;

  _counter = 0;
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "invariant");
  OrderAccess::fence();
}

In a nutsel _counter is a permit and if the permit is bigger than 0 it will consumed and the method will return immediately. If an interrupt is pending the method should return immediately. If nanos to wait have been supplied then the time to wait on the condition is calculated. By using the mutex provided either the thread will wait forever until is unparked or interuped or the thread will wait for the nanos provided.

Let’s see an example of parking a thread:

    @Test
    void park() throws InterruptedException {
        Thread secondaryThread = new Thread(LockSupport::park);

        secondaryThread.start();

        secondaryThread.join(2000);
        log.info("Could not join thread is parked");

        assertTrue(secondaryThread.isAlive());
        LockSupport.unpark(secondaryThread);
        secondaryThread.join();

        assertFalse(secondaryThread.isAlive());
        log.info("Thread was unparked");
    }

We start the thread and park it. When we try to wait for the main thread we use a time limit, if time limit was not set we would wait forever. Eventually the time passes and before retrying to join the thread we unpark. As expected the thread is unparked and eventually finishes.

If the permit is not available the thread lies dormant disabled for thread scheduling. Initially the permit is zero. We can try with one thread.

    @Test
    void unParkAndPark() {
        final Thread mainThread = Thread.currentThread();
        LockSupport.unpark(mainThread);
        LockSupport.park();
    }

By using unpark initially we made a permit available, thus on park the permit was consumed and we returned immediately.

The other element that we see in the LockSupport class is the blocker.

The Blocker maps to the parkBlocker object in the Thread implementation of java.

public class Thread implements Runnable {
...
    volatile Object parkBlocker;
...
}

The blocker represents the synchronization object responsible for this thread parking.
Thus we can note which object is responsible for the thread being parked, which can help for debugging and monitoring purposes.
The unsafe methods are called to set the parkBlocker value to the Thread instance.

Now that we saw how LockSupport works behinds the scenes we can see the example provided by the Javadoc the FIFOMutex.

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

public class FIFOMutex {

    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters
            = new ConcurrentLinkedQueue<Thread>();

    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);

        // Block while not first in queue or cannot acquire lock
        while (waiters.peek() != current ||
               !locked.compareAndSet(false, true)) {
            LockSupport.park(this);
            if (Thread.interrupted()) // ignore interrupts while waiting
                wasInterrupted = true;
        }

        waiters.remove();
        if (wasInterrupted)          // reassert interrupt status on exit
            current.interrupt();
    }

    public void unlock() {
        locked.set(false);
        LockSupport.unpark(waiters.peek());
    }

}

All threads will be added to the queue. Since this is a FIFO queue, the first thread should mark the locked variable as true and should be able to proceed and remove itself from the queue. The rest of the threads should be parked. In case of an interrupt LockSupport will exit thus we should reassert the interrupt on the thread on exit.
Also pay attention that the blocker is set as the instance of the FIFOMutex class.

By using unlock the threads that have been in a parked state will resume.

We can also check another usage of lock support. Rate limiters can benefit from LockSupport.
We can check the source code of the Refill Rate Limiter:

private boolean waitForPermission(final long nanosToWait) {
        waitingThreads.incrementAndGet();
        long deadline = currentNanoTime() + nanosToWait;
        boolean wasInterrupted = false;
        while (currentNanoTime() < deadline && !wasInterrupted) {
            long sleepBlockDuration = deadline - currentNanoTime();
            parkNanos(sleepBlockDuration);
            wasInterrupted = Thread.interrupted();
        }
        waitingThreads.decrementAndGet();
        if (wasInterrupted) {
            currentThread().interrupt();
        }
        return !wasInterrupted;
    }

In a Rate Limiter the permits to acquire are limited over time. In case of a thread trying to acquire a permit there is an option to wait until a permit is available. In this case we can park the thread. This way our rate limiter will prevent the time busy on spinning.

That’s it. Now that we know about lock support we can proceed on more interesting concurrency concepts.

One thought on “Java Concurrency: LockSupport

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.