Java Concurrency: Condition

Previously we checked on ReentRantLock and its fairness. One of the things we can stumble upon is the creation of a Condition. By using Condition we can create mechanisms that allow threads to wait for specific conditions to be met before proceeding with their execution.

 

The condition provides the following methods:


public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}

The closest we came to that so far is the wait Object Monitor method.
A Condition is bound to a Lock and a thread cannot interact with a condition and its methods if it does not have a hold on that lock.
Also Condition uses the underlying lock mechanisms, for example signal and signalAll will use the underlying Queue of the threads that is maintained by the Lock and will notify them to wake up.

One of the obvious things to implement using Conditions is a BlockingQueue. Worker threads processing data and publisher threads dispatching data. Data are published on a queue, worker threads will process data from the queue and then they should wait if there is no data in the queue.

For a worker thread, if the condition is met the flow is the following:

  • Acquire the lock
  • Check the condition
  • Process Data
  • Release the lock

If the condition is not met, the flow would slightly change to this:

  • Acquire the lock
  • Check the condition
  • Wait until the condition is met.
  • Re-acquire the lock
  • Process Data
  • Release the lock

The publisher thread whenever it adds a message it should notify the threads waiting on the condition.

The workflow would be like this.

  • Acquire the lock
  • Publish data
  • Notify the workers
  • Release the lock

 

Obviously this functionality already exists through the BlockingQueue interface and the LinkedBlockingDeque and ArrayBlockingQueue implementation.
We will proceed with an implementation for the shake of the example.

Let’s see the Message Queue:

package com.gkatzioura.concurrency.lock.condition;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MessageQueue<T> {

    private Queue<T> queue = new LinkedList<>();
    private Lock lock = new ReentrantLock();
    private Condition hasMessages = lock.newCondition();

    public void publish(T message) {
        lock.lock();
        try {
            queue.offer(message);
            hasMessages.signal(); 
        } finally {
            lock.unlock();
        }
    }

    public T receive() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                hasMessages.await();
            }
            return queue.poll();
        } finally {
            lock.unlock();
        }
    }

}

Now let’s put it into action:

        MessageQueue<String> messageQueue = new MessageQueue<>();

    @Test
    void testPublish() throws InterruptedException {
        Thread publisher = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                String message = "Sending message num: " + i;
                log.info("Sending [{}]", message);
                messageQueue.publish(message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread worker1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    String message = messageQueue.receive();
                    log.info("Received: [{}]", message);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread worker2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    String message = messageQueue.receive();
                    log.info("Received: [{}]", message);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        publisher.start();
        worker1.start();
        worker2.start();

        publisher.join();
        worker1.join();
        worker2.join();
    }

That’s it! Our workers processed the expected messages and waited when the queue was empty.

Advertisement

Java Concurrency: ReentRantLock Fairness

Previously we saw some of the building blocks of concurrency in Java. In this blog we will focus on ReentRantLock. ReentRantLock is based on the AbstractQueuedSynchronizer.

By using the ReentRantLock we allow a thread to acquire a lock and use it multiple times. As the name suggests is provides us with Reentrancy just like the synchronized blocks.
Because ReentRantLock is implemented using the AbstractQueuedSynchronizer we can have fairness. The threads that try to acquire the lock will wait in a FIFO fashion and the algorithm will try to ensure fairness.

If we check our previous blog on locks we can see that locks are featureful.

If a thread holds the lock we cannot unlock it, only that thread is able to unlock it:

    @Test
    void alreadyAcquired() throws Exception {
        final ReentrantLock reentrantLock = new ReentrantLock();

        Thread thread = new Thread(() -> {
            log.info("Try first time");
            reentrantLock.lock();
            log.info("Got the lock");
        });

        thread.start();
        thread.join();
        assertThrows(IllegalMonitorStateException.class, reentrantLock::unlock);
    }

Also as the name suggests let’s see the reentrancy. In the following example the same thread will try to get the lock it holds multiple times.

@Test
void reentrancy() throws Exception {
final ReentrantLock reentrantLock = new ReentrantLock();

Thread thread = new Thread(() -&gt; {
log.info("Try first time");
reentrantLock.lock();
log.info("Got the lock");

log.info("lock one more time");
reentrantLock.lock();
log.info("locked again");
reentrantLock.unlock();
});

thread.start();
thread.join();
}

In the next example we will construct a ReentRantLock with fairness enabled. Let’s do a small test on ReentRantLock’s of fairness.
5 threads will try to acquire the lock simultaneously. Then we will see how they achieved so and their order on acquiring the lock.

    @AllArgsConstructor
    private class CheckFairnessRunnable implements Runnable {

        private final int sleepTime;
        private final Lock lock;

        @Override
        public void run() {
            try {
                Thread.sleep(sleepTime);
                log.info("acquiring lock");
                lock.lock();
                log.info("lock acquired");
                Thread.sleep(5000);
                lock.unlock();
                log.info("lock unlocked");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    void fairness() throws Exception {
        int numberOfThreads = 5;
        ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

        final ReentrantLock reentrantLock = new ReentrantLock(true);

        for (int i = 0; i < numberOfThreads; i++) {
            Runnable runnable = new CheckFairnessRunnable(i*100, reentrantLock);
            executor.execute(runnable);
        }

        executor.shutdown();
        executor.awaitTermination(60, TimeUnit.SECONDS);
    }

If we examine the code and the CheckFairnessRunnable the thread initially sleeps. This is done on purpose so threads don’t arrive at the lock at the same time. The time a thread spends when it acquires the lock is high on purpose so we make sure all threads will wait for the lock.

Let’s check the output:

09:07:43.782 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:43.788 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:43.882 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:43.985 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:44.085 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:44.185 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:48.797 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:48.796 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:07:53.802 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:53.802 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:07:58.807 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:58.806 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:08:03.813 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:08:03.813 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:08:08.819 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked

We can see that the threads that were first to try to acquire the lock were also the ones that got the lock, so essentially the order was preserved.

That’s it on the next blog we will check the Condition interface.