AtomicInteger on Java and Round-Robin

AtomicInteger belongs to the family of Atomic Variables. The main benefit is that using it, is not blocking instead of doing a blocking synchronization, thus you avoid the suspension and rescheduling of thread.

The AtomicInteger is based on the Compare and Swap mechanism and is part of the scalar group of the atomic variables.

Our first use case would be a function on a web page which might be accessed multiple times.

package com.gkatzioura.concurrency;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerExample {

    private AtomicInteger atomicInteger = new AtomicInteger();
    public void serveRequest() {
        atomicInteger.incrementAndGet();
        /**
         * logic
         */
    }

    public int requestsServed() {
        return atomicInteger.get();
    }
}

And the test for our use case

package com.gkatzioura.concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class AtomicIntegerExampleTest {

    private AtomicIntegerExample atomicIntegerExample;

    @BeforeEach
    void setUp() {
        atomicIntegerExample = new AtomicIntegerExample();
    }

    @Test
    void testConcurrentIncrementAndGet() throws ExecutionException, InterruptedException {
        final int threads = 10;

        ExecutorService executorService = Executors.newFixedThreadPool(threads);

        List<Future> futures = new ArrayList();

        for (int i = 0; i  {
                atomicIntegerExample.serveRequest();
                return null;
            }));
        }

        for(Future future: futures) {
            future.get();
        }

        Assertions.assertEquals(10,atomicIntegerExample.requestsServed());
    }

}

Apart from using atomic integer as a counter, you can use it in various cases. For example a thread safe round robin algorithm.

package com.gkatzioura.concurrency;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerRoundRobin {

    private final int totalIndexes;
    private final AtomicInteger atomicInteger = new AtomicInteger(-1);

    public AtomicIntegerRoundRobin(int totalIndexes) {
        this.totalIndexes = totalIndexes;
    }

    public int index() {
        int currentIndex;
        int nextIndex;

        do {
            currentIndex = atomicInteger.get();
            nextIndex = currentIndex< Integer.MAX_VALUE ? currentIndex+1: 0;
        } while (!atomicInteger.compareAndSet(currentIndex, nextIndex));

        return nextIndex % totalIndexes;
    }

}

The totalIndex is the total number of indexes. When a request for the next index is being requested then the counter shall be incremented and a compare and set operation will take place. If it fails due to another thread then it will try the operation again and will get the next value of the counter.
A modulo operation will give the current index. If the atomic Integer reaches the max value it shall be reset to zero. The reset can cause an edge case and change the order of the indexes. If this is an issue you can adjust you max value based on your total index size in order to avoid this.

Also some testing on that.

package com.gkatzioura.concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class AtomicIntegerRoundRobinTest {

    private static final int MAX_INDEX = 10;

    private AtomicIntegerRoundRobin atomicIntegerRoundRobin;

    @BeforeEach
    void setUp() {
        atomicIntegerRoundRobin = new AtomicIntegerRoundRobin(MAX_INDEX);
    }

    @Test
    void testIndexesSerially() {
        for(long i=0;i<MAX_INDEX*20;i++) {
            System.out.println(atomicIntegerRoundRobin.index());
        }

        Assertions.assertEquals(0, atomicIntegerRoundRobin.index());
    }

    @Test
    void testIndexesConcurrently() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        List<Future> futures = new ArrayList();

        for (int i = 0; i  atomicIntegerRoundRobin.index()));
        }

        for(Future future: futures) {
            System.out.println(future.get());
        }

        Assertions.assertEquals(0,atomicIntegerRoundRobin.index());
    }

}

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.