Lockless Single Producer Single Consumer bounded queue

Why lockless queue

A few days ago, the thought of building a lockless queue was somehow implanted in my mind. Queues like these can be handy for passing messages or work packages between threads. Of course the reason people do these things (by 'these' I mean, beeing foolhardy enough to use threads) is performance. So if we need performance, we need to:

a) Be cache friendly
b) Use an efficient algorithm
c) Avoid memory allocation
d) Avoid using locks

The three first bullets, point to the direction of avoiding any solution that involves a linked list with nodes allocated on the heap. So I choose to go with a pre-allocated ring buffer based on our good friend the std::vector. And trying to keep it lockless (d) means that we will get the chance to use std::atomic.

API

But first lets specify the API that we are building. We are building a queue that connects a producer thread (that pushes items in the queue) and a consumer thread (that pops items from the queue). There must be exactly one producer thread and exactly one consumer thread. For instance the producer cannot pop items from the queue every now and then because this would mean that we would at that moment have 2 consumer threads.

Our queue must be bounded. That means that it will have a fixed maximum size that it cannot grow beyond. This is a good property most of the times. In case something happens to the consumer, or if the consumer is just too slow, the producer will need to know eventually that the queue is full and somehow slow down. If we allowed the queue to grow we could potentially exhaust all available memory, with catastrophic consequences.

Our queue must be non-blocking. This means, that the threads that access it will always be able to keep running while accessing the queue and make some kind of progress. So, if the consumer tries to pop (remove) an item and the queue happens to be empty, he will simply get no item and an error code will indicate that the queue was empty. In the same way, if the producer tries to push (insert) an item and the queue is full, no item will be inserted and he will be notified immediately by an error code. Of course, in the non exceptional (full or empty) case the producer and consumer will also be able to push and pop items respectively concurrently, without getting blocked.

Our queue will be governed by a FIFO policy, as most queues do. We will be able to retrieve items in the same order as they were inserted.

So, our queue API in C++ is the following:

template <typename T>
class queue_spsc
{
public:
    // Maximum capacity must be specified at construction time
    queue_spsc(int max_capacity);
        
    // The producer can push items in the back of the queue.
    // false is returned if the queue is full
    bool push_back(const T &item);

    // The consumer can pop items from the front of the queue.
    // false is returned if the queue is empty
    bool pop_front(T &item);
};

Please don't focus for the time being on my sin of not passing capacity as size_t.

Underlying data structure

All of the requirements above, point to us using a ring buffer. To keep things simple to reason about, I opted to pre-allocate an std::vector and use two integer variables that indicate the indexes of the front (mReadPos) and the back (mWritePos) of the queue.

template <typename T>
class queue_spsc
{
private:
    std::atomic<int> mReadPos;
    std::atomic<int> mWritePos;
    std::vector<T> mBuffer;
};

Data structures are like ...frozen algorithms. The push_back and pop_front algorithms are quite straightforward. But there is just one small caveat in this case. When the queue is empty, mReadPos == mWritePos, but if mBuffer has a capacity equal to the queue capacity, then mReadPos == mWritePos would also hold true when the buffer is full. This would not allow us to disambiguate between the "full" and the "empty" state. To overcome this, we can increase the size of mBuffer by one to make it equal to capacity + 1. This allows us to represent the empty state as mReadPos == mWritePos while the full state is mReadPos == increment(mWritePos). So it is the state where if we increment mWritePos, it would "collide" with mReadPos. Of course increment in the circular buffer context means modulo arithmetic since we want the pointers to "wrap around" if they hit the end of the buffer:

    inline int increment(int pos)
    {
        return (pos + 1) % int(mBuffer.size());
    }

Algorithms

So, if we were operating in a single threaded environment, the push_back and pop_front algorithms would be:

bool push_back(const T &item)
{
    int r = mReadPos;
    int w = mWritePos;
    // Assuming we write, where will we move next?
    const int next_w = increment(w);

    // Detect queue full case
    if (r == next_w)
        return false;

    mBuffer[w] = item;
    mWritePos = next_w;
    return true;
}
bool pop_front(T &item)
{
    int r = mReadPos;
    int w = mWritePos;
    // Queue is empty check
    if (r == w)
        return false;

    item = mBuffer[r];
    mReadPos = increment(r);
    return true;
}

... but we are in a multithreaded environment and this simplistic approach should not fly. Or should it? See, at first glance there is a problem with both of these functions. Even if each load and store to the integers is atomic, in the beginning both functions read the values of mReadPos and mWritePos. These are two individual loads and do not represent a consistent state of the queue. Even worse, mWritePos might be changed by the producer at any time. For instance, while pop_front had read mReadPos and mWritePos, then the producer might increment mWritePos and the view of the world that the consumer will load on r and w will be out of date, compared to what it would have been if the entire queue was locked with a mutex as soon as we got in pop_front.

But this turns out to not be such a big problem. The reason we load both integers in pop_front is to check if the queue is empty. There is no other reason we use w for. If the queue is empty while we are entering the function it is possible that an item might get inserted in the mean time. In this case, we will return false, signaling that the queue is empty when in fact it isn't. This is not a fatal flaw though but something we can tolerate. The consumer will simply retry at another time, and will still retrieve the items. Since we are the only consumer, and the only thread writing to mReadPos we are guaranteed to have a correct value for mReadPos. On the other hand, we might have an outdated value of mWritePos but luckily, since the producer only adds items, this means that we might only underestimate the number of items in the queue. Underestimating the items is not a huge problem be cause if we get tricked and see the queue as empty, the consumer will simply retry the read.

The same logic saves the day for the producer in push_back. push_back might momentarily overestimate the number of items in the queue because it holds an outdated value of mReadPos. This might result in reporting that the queue is full, while in fact some buffer space has been freed in the mean time. But it's not a problem because the producer will simply retry the push_back shortly.

So, surprisingly, this "naive" lockless algorithm works in a SPSC environment too, provided that loads and stores to mReadPos and mWritePos are atomic. It does not work if we have more than one consumers or producers though!

First benchmark

In order to benchmark the lockless queue, I also developed a similar queue protected by a mutex. So let's check out how they compare. To do that, I built a simple test program. We initialize a queue with a max capacity of 4k integers. We then use a producer and a consumer thread, to send a sequence of 100 million integers from the producer to the consumer. Both threads work non-stop but if they find that the queue is empty (or full) and they cannot pull (or push) elements, they call std::this_thread::yield() in order to give time to other threads to run. Yielding dramatically improves performance in a single core environment while not affecting too much multicore performance. You can see the test program here.

So on a quad core intel i5 3570 running linux 4.16 with GCC 8.1 and compiling with -O2 and -march=native we get:

Lockless:5470.86 ms
Locking:16889 ms

So our lockless queue is roughly 3x times faster than the locking queue. Not bad for a quick test.

But there's more to it

The store and load operations of C++ atomic types accept a memory_order argument. This further determines the semantics of the atomic operation that we wish to perform. It is one thing to perform an atomic store and another thing to guarantee that this store will be performed between some other specific actions, or that it will be guaranteed to be visible to another load operation. Until now, we have been using the quite conservative std::memory_order_seq_cst memory ordering. This guarantees that each of the atomic operations that we have performed, is Sequentially consistent, or that there is a single total modification order of all atomic operations that we performed. This is not something that we require in our queue. It is my understanding that it is possible in this use case, to relax these requirements and use the less strict Release-Consume ordering. The subject is quite
complex but it is my understanding that to maintain correctness, there needs to be an atomic store tagged with memory_order_release from one thread and a load tagged with memory_order_consume on the other. In addition, all the operations that must happen after the atomic load, must carry a data dependency from the loaded value.

In our case, my understanding is that mBuffer[w] = item; carries a data dependency from the const int w = mWritePos.load(std::memory_order_consume); and the mBuffer[w] = item; is guaranteed to happen before the mWritePos.store(next_w, std::memory_order_release); because 'no reads or writes in the current thread can be reordered after this store'.

I am not 100% confident that this modification is 100% correct. I have not found any test where this version fails though and it further improves the performance of the queue. So lets move on with some benchmarks:

Second benchmark

intel i5 3570 running linux 4.16 with GCC 8.1 and compiling with -O2 and -march=native

Lockless:1697.31 ms
Locking:16865.7 ms

Wow, here the lockless queue is roughly 10x faster than the one with the mutex.
The same benchmark under windows 10 / VS2015 / x64:

Lockless:1753.56 ms
Locking:8465.56 ms

The surprise here is that the locking queue is 2x faster than the one one in linux.
Running under ChromeOS / Linux 3.18 / gcc 5.4 / Intel 3855U

Lockless:4497.74 ms
Locking:27663.6 ms

A speedup of roughly 6x.

Running under Linux 4.9 / gcc 6.3 / Intel J3455 (Atom)

Lockless:2820.54 ms
Locking:16700.2 ms

Again, roughly 6x.
Finally running under the raspberry pi 3B (Linux 4.14 / gcc 6.3 / ARMv7 rev4)

Lockless:7775.53 ms
Locking:128260 ms

The raspberry is significantly slower than anything else but still the lockless queue is 16x faster than the one with the mutex, which actually makes it faster than any locking queue performs in any of the other CPUs.

Show me the code

All the code is available here

Show Comments