I recently got a chance to dive into the implementation details of concurrent (thread-safe) data structures and also techniques for implementing them lock-free. In this post, I want to specifically discuss the FIFO (First In, First Out) Queue.

Concurrent FIFO Queue with locking

An implementation with locking is as follows:

  • Use two locks: enqueueLock and dequeueLock.
  • Any Enqueue operation/thread needs to acquire the enqueueLock and similarly any Dequeue operation needs to acquire the dequeueLock.
  • If a Dequeue operation is invoked and the Queue is empty, it will block other Dequeue operations until an Enqueue operation is performed.
  • We can have a max-size for the Queue and Enqueue operations would block other Enqueue operations until the the size reduces below max-size.

Note 1: The above is how Java BlockingQueue is implemented (particularly LinkedBlockingQueue).
Note 2: BlockingCollection is the C# equivalent for Java BlockingQueue. It’s implemented as a wrapper over the lock-free ConcurrentQueue and uses a semaphore for controlling access when the collection is full or empty.

As a concrete example, below is a snippet of the implementation of LinkedBlockingQueue in Java (source):

(put/take are the same as Enqueue/Dequeue operations described above)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger(0);
    private transient Node<E> head;  // head of linked list
    private transient Node<E> last; // tail of linked list
    private final ReentrantLock putLock = new ReentrantLock();
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final Condition notFull = putLock.newCondition();

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            last = last.next = node;
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    .
    .
}

The upsides of the locking approach is:

  • It is thread-safe.
  • It works well in producer-consumer scenarios. Blocking in the empty queue scenario creates backpressure from consumers to producers.

The downsides:

  • Acquiring and releasing of locks has a performance overhead, which introduces additional latency to operations.
  • Locking and blocking prevent multiple threads from performing operations concurrently, resulting in reduced throughput.

Lock-free

Why lock-free?

While we saw above that a locking based implementation is possible, lock-free (and non-blocking) implementations exist to avoid the downsides of locking and blocking: increased latency and reduced throughput. ConcurrentQueue in C# and ConcurrentLinkedQueue in Java are lock-free implementations.

Implementation

The lock-free implementations in both Java and C# are inspired by a 1996 paper by Michael-Scott titled “Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms”:
https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf

A very important component of the lock-free implementation is the atomic CAS (compare-and-swap) operation. Java provides AtomicReference.compareAndSet for this and C# provides us Interlocked.CompareExchange.

A brief description of the algorithm:

  • The underlying data structure we use is a linked list where each node contains an item and a pointer to the next node.
  • We use compare-and-swap (CAS) operations to update the head and tail pointers, ensuring that concurrent operations on the queue are thread-safe without locking.
    • Enqueue: a new node is created and linked as the new tail using CAS.
    • Dequeue: the head pointer is updated to the next node using CAS.

If you’d like to see concrete implementations:

Async implementation

Another interesting way to implement a Concurrent Queue is by providing asynchronous programming capabilities to users (async/await in C#).
C# provides us with the Channels library for this. This is an async (non-blocking) alternative to BlockingCollection we discussed above for producer-consumer scenarios.

Without going into the implementation details of Channels, let’s look at an example of the producer/consumer pattern from the official documentation.

Producer example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Consumer example

1
2
3
4
5
6
7
8
static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Closing thoughts

Both Java and C# provide a variety of implementations of Concurrent FIFO Queue depending on various use cases like producer-consumer, high throughput, asynchronous, etc.
Building a good understanding of these may be helpful while navigating codebases that use one or more of the different types of these data structures and concurency primitives; or in choosing the right implementation for the task at hand while building new software or refactoring existing code for scalability.