Multithreading and job queues

Multithreading is one essential element in modern game programming, but it’s also the nightmare of many developers and the cause of countless bugs. The way i think about it is like the combination of one unsolvable problem and two solvable problems. The first one is a direct consequence of the nature of concurrency: without multithreading our code is deterministic, while after introducing concurrency the determinism is lost. We can design very sophisticated thread managers, but in the end it’s always the operating system that makes the call about the execution ordering, and we have no control over that. The main consequence of this problem is that it becomes very difficult to reason about your code. The second problem is represented by our necessity to decide which thread executes what task. This is of course solvable, but as many other solvable problems there are more efficient solutions than others. And finally, we have the issue of communicating between threads, particularly in order to handle synchronization and shared data. In this post i want to discuss one neat way in particular of implementing multithreading, one that hopefully solves the last two problems in an efficient way: the job queue (also known as work queue, or task system, or scheduler, or whatever, you get the point!). 

 

A Fresh Perspective on Concurrency

Let’s start from the basic way you might want to deal with concurrency in a first place. You have a bunch of threads and a bunch of semi-independent systems in your engine that you want to parallelize. The most straightforward way to deal with the first solvable problem i mentioned before is to assign one thread to each system, and therefore you can create your AI thread, the audio thread, the debug system thread, the asset loading thread, etc.. However, if you care about performance and optimization you will discover that this system is actually very inefficient. Maybe your AI thread is always going to be full of work, but what about the audio thread, or the asset loading one? Probably your application is not going to import assets all the time, or to play hundreds of audio files together, and as a consequence you might have some thread that is starving for some additional work. Another way of thinking about who executes what, is to schedule the threads not according to the different conceptual systems that compose your engine, but according to the concept of priority. All the needed tasks can be sent into a queue with the desired priority level, without making differences about their own nature. The queue itself can be FIFO, meaning that tasks with the same priority start to be processed in order of arrival (but can finish in a complete different order of course), and circular to have wrapping for read and write indices. This system ensures that each thread has always some work to do, provided that the subdivision of priorities is actually balanced. However, we need to solve a new problem now! How can we be sure that the same task is not executed by multiple threads? This was not a possibility before, since the threads were organized by categories and the same task could only be executed by a single thread. The answer comes from special x64 instructions such as InterlockedCompareExchange, designed in order to guarantee the univocity of the execution with respect to multiple competing threads. The way this function works is by atomically comparing a destination value and a comparand value, both representing the index of the next entry to read, and if they are equal the destination is replaced with a third value (usually the incremented index). The function returns the original destination value, and since the first thread that wants to execute the task is going to increment the index, it’s possible to guarantee thread safety by executing the task only if the return value is equal to the original index. The function adds a little bit of overhead to the system, but we can minimize its impact by designing our tasks to be long enough to limit the amount of function calls.

Up to this point the idea of a job queue seems convincing, but let’s stress the system a little bit more. Tasks can be added during the lifetime of our application and the queue can execute them using a busy-waiting technique. In this way, after a thread finishes some work it can be put to sleep for the desired amount of time. This is very important because we want our program to execute also the rest of the code, without being stuck forever into the queue,  but it is also useful in order to limit the power consumption of our application. However, the communication problem we mentioned before is very relevant right now. How can we signal to the operating system when to put a thread to sleep or to resume its activity? A common solution to this issue is to use a semaphore, which is a countable wait primitive capable of controlling a shared resource between threads. A semaphore handles a count between a minimum and a maximum value (in this case equal to the amount of threads). Initially the count is set to zero and this can be interpreted as zero available threads. When the count is incremented, a number of threads proportional to the count itself are awaken from their sleep and are capable of executing the tasks. Each time the game logic sends the necessary data for a task it has to increment the semaphore by one, asking for the attention of at least one thread. In Win32 this increment can be performed by calling the ReleaseSemaphore function. Finally, after a thread wakes up it decrements the count by one, effectively signaling to be unavailable for the time being. 

 

Implementing the job queue

Now it’s time to turn all this fancy reasoning into practice. Using C++ and the Win32 API to begin with, we can spawn a new thread by calling the CreateThread function. As with any Win32 function it’s useful to read the related documentation, which you can find here (https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createthread). After reading the explanation of the function arguments we discover that it’s necessary to include an application-defined function, called ThreadProc, which represents the entry point of the thread. Moreover, we can pass a pointer to a variable or a struct that we want to be used by the thread, and in our case this struct is going to be our work queue itself. Let’s start by defining the queue as a collection of entries, a semaphore and a bunch of volatile variables that are going to be useful in order to control the queue itself. The entries in turn contain a data component and a callback that represents the actual task:

// We pre-declare the platform_work_queue to break the circular dependency
// The callback is defined through a macro because in this way we only need to 
// specify its name, instead of writing every time also the prototype
struct platform_work_queue;
#define PLATFORM_WORK_QUEUE_CALLBACK(name) void name(platform_work_queue* queue, void* data)
typedef PLATFORM_WORK_QUEUE_CALLBACK(platform_work_queue_callback);

struct platform_work_queue_entry
{
	void* data;
	platform_work_queue_callback* callback;
};

struct platform_work_queue
{
	platform_work_queue_entry entries[256];

	HANDLE semaphoreHandle;
	
	uint32 volatile completionGoal;
	uint32 volatile completionCount;
	uint32 volatile nextEntryToRead;
	uint32 volatile nextEntryToWrite;
};

The volatile keyword is a way to warn the compiler that a variable might be changed externally (by another thread in this case), and therefore it’s not allowed to optimize it away. This is another very important concept in multithreading programming, which is also linked to the problem of synchronization and communication: the compiler doesn’t know about the existence of other threads, and therefore it might choose to hide away some variable into a register if the current thread is not going to use it. Specifying variables as volatile is not only useful, but also necessary in these cases. Before the creation of the threads we initialize all volatile variables to zero and we create and assign a semaphore to the queue:

// Right now we only need to pass the queue into the thread, but if some additional
// non queue-related data is needed this is the struct we have to fill
struct win32_thread_startup
{
	platform_work_queue* queue;
};

void createWorkQueue(platform_work_queue* queue, uint32 threadCount, win32_thread_startup* threadStartups)
{
	queue->completionCount = 0;
	queue->completionGoal = 0;
	queue->nextEntryToRead = 0;
	queue->nextEntryToWrite = 0;

	uint32 initialCount = 0;
	queue->semaphoreHandle = CreateSemaphoreEx(0, initialCount, threadCount, 0, 0, SEMAPHORE_ALL_ACCESS);

	for (uint32 threadIndex = 0; threadIndex < threadCount; ++threadIndex)
	{
		win32_thread_startup* threadStartup = threadStartups + threadIndex;
		threadStartup->queue = queue;

		DWORD threadID;
		HANDLE threadHandle = CreateThread(0, 0, ThreadProc, threadStartup, 0, &threadID);
		CloseHandle(threadHandle);
	}
}

// The concept of priorities can be implemented by spawning an array of threads associated
// to a single queue. Different queues may handle a different number of threads, and the
// latter is what ultimately determines their priority level. Here is an example about
// the creation of two queues, one high priority and one low priority

// High priority queue
win32_thread_startup HPThreadStartups[6] = {};
platform_work_queue HPQueue = {};
createWorkQueue(&HPQueue, ArrayCount(HPThreadStartups), HPThreadStartups);

// Low priority queue
win32_thread_startup LPThreadStartups[2] = {};
platform_work_queue LPQueue = {};
createWorkQueue(&LPQueue, ArrayCount(LPThreadStartups), LPThreadStartups);

Now let’s see how to define the ThreadProc function. At first we extract the queue that is passed through the win32_thread_startup struct, and then we execute the busy-waiting loop:

DWORD WINAPI ThreadProc(LPVOID lpParameter)
{
    win32_thread_startup* threadStartup = (win32_thread_startup*)lpParameter;
    platform_work_queue* queue = threadStartup->queue;
        
    for (;;)
    {
    	if (processNextWorkQueueEntry(queue))
    		// decrease semaphore count by 1 (on wakeup, not on entry)
    		WaitForSingleObjectEx(queue->semaphoreHandle, INFINITE, FALSE);		
    }
}

The function processNextWorkQueueEntry implements the actual FIFO logic for the queue. At the beginning we save the index of the next entry to read into a comparand variable. If this index is equal to the one determined by the corresponding addWorkQueueEntry call, then the thread has work to do, otherwise it is going to sleep. If we are working, now it’s the time to execute the InterlockedCompareExchange trick by letting the first available thread to increment the entry index (note the queue circularity through the modulus operator). Finally, only this lucky first worker has the privilege to execute the callback and to atomically increment the completion count:

bool32 processNextWorkQueueEntry(platform_work_queue* queue)
{
	bool32 shouldSleep = false;

	// Implement circular FIFO queue
	uint32 originalNextEntryToRead = queue->nextEntryToRead;
	uint32 newNextEntryToRead = ((queue->nextEntryToRead + 1) % ArrayCount(queue->entries));

	// If there is work to do
	if (originalNextEntryToRead != queue->nextEntryToWrite)
	{
		// Increment index
		uint32 index = InterlockedCompareExchange((LONG volatile*)&queue->nextEntryToRead, newNextEntryToRead,
			                                  originalNextEntryToRead);

		if (index == originalNextEntryToRead)
		{
			platform_work_queue_entry entry = queue->entries[index];
			entry.callback(queue, entry.data);
			InterlockedIncrement((LONG volatile*)&queue->completionCount);
		}
	}
	else
		shouldSleep = true;

	return shouldSleep;
}

// Sometimes we want to execute all the task inside the queue, and
// to start filling it again after some event (such as hot code reloading)
void processAllWorkQueue(platform_work_queue* queue)
{
    while (queue->completionGoal != queue->completionCount)
    	processNextWorkQueueEntry(queue);

    queue->completionGoal = 0;
    queue->completionCount = 0;
}

Now, the queue can start working only after we prepared some task for it to crunch, and the work load is dictated of course by the necessities of the application. Each task is characterized by its data and callback, and these two elements together with the queue itself are going to be the arguments of the addWorkQueueEntry function:

void addWorkQueueEntry(platform_work_queue* queue, platform_work_queue_callback* callback, void* data)
{
	// Implement circular FIFO logic
	uint32 newNextEntryToWrite = ((queue->nextEntryToWrite + 1) % ArrayCount(queue->entries));

	// Make sure the cicular queue hasn't wrapped around, before writing
	Assert(newNextEntryToWrite != queue->nextEntryToRead);

	platform_work_queue_entry* entry = queue->entries + queue->nextEntryToWrite;
	entry->data = data;
	entry->callback = callback;
	++queue->completionGoal;

	// Protect from compiler aggressive code optimization
	_WriteBarrier();

	queue->nextEntryToWrite = newNextEntryToWrite;

	// Increase semaphore count by 1 and return previous one
	ReleaseSemaphore(queue->semaphoreHandle, 1, 0);
}

In this last function, we update the circular index of the work to be executed and we insert data and callback into the queue. Note how the _WriteBarrier() call is used in order to protect against aggressive compiler optimizations that may move the code up and down the boundary  without considering the multithreading context. Finally we increment the semaphore, forcing at least one thread to become interested in our task.

Wow, that was quite a ride but i think the multithreaded job queue was worth it. I am sure the system can still be greatly improved, but this is what i am currently toying with. I hope to be back soon with some update!