Archive

Archive for the ‘Multi-threading’ Category

Request dispatcher using C++ threads

November 30, 2016 Leave a comment

A couple of weeks ago I found myself looking at implementing a request dispatcher for an existing embedded C++ project. The basic concept was to have a static dispatcher class which could have requests added to it from various sources. Worker threads would then get requests when available and process them.

Looking around for inspiration I didn’t find anything which really appealed to me. The main issue with many of them was that they didn’t so much use a dispatcher as more of a request queue, with each worker thread responsible for polling the queue in order to wait for new requests. This seemed rather messy and error-prone to me.

The solution I ultimately settled on was a dispatcher which would spawn a specified number of worker threads, each of which would launch and add themselves to the dispatcher, where they would wait on a request using their own condition variable. An incoming request would be assigned to a waiting worker, after which its condition variable would be signalled. The worker thread would then process the request, finish it and add itself back to the dispatcher.

While I originally implemented the Dispatcher in C++ 2003 using the POCO libraries for its threading features as C++11 support was absent for the target platform, I later reimplemented it in pure C++11 using its native threading API. Basic usage of this dispatcher looks like the following:

#include "dispatcher.h"
#include "request.h"

#include <iostream>
#include <string>
#include <csignal>
#include <thread>
#include <chrono>

using namespace std;


// Globals
sig_atomic_t signal_caught = 0;
mutex logMutex;


void sigint_handler(int sig) {
	signal_caught = 1;
}


void logFnc(string text) {
	logMutex.lock();
	cout << text << "\n";
	logMutex.unlock();
}


int main() {
	signal(SIGINT, &sigint_handler);
	Dispatcher::init(10);
	
	cout << "Initialised.\n";
	
	int cycles = 0;
	Request* rq = 0;
	while (!signal_caught && cycles < 50) {
		rq = new Request();
		rq->setValue(cycles);
		rq->setOutput(&logFnc);
		Dispatcher::addRequest(rq);
		cycles++;
	}
	
	this_thread::sleep_for(chrono::seconds(5));
	
	Dispatcher::stop();
	cout << "Clean-up done.\n";
	
	return 0;
}

Of note here are the sigint_handler() and logFnc() methods. The first is used in combination with signal() from the header to allow one to interrupt the application. This is useful for for example a server application. In this particular implementation it’s unnecessary, but serves as inspiration for a more complex application.

The logFnc() method is the method being passed to each request, to allow it to pass any output to from the processing of the request. Here it’s just used to safely output to cout.

The main() method is rather uneventful: after installing the SIGINT signal handler, the Dispatcher is initialised with a total of ten worker threads. After this a loop is entered during which a total of fifty Request instances are added to the Dispatcher. Next we sleep for five seconds to allow all requests to be processed. After this we stop the Dispatcher: this will terminate all worker threads.

Moving on, the Dispatcher class definition looks as follows:

#pragma once
#ifndef DISPATCHER_H
#define DISPATCHER_H

#include "abstract_request.h"
#include "worker.h"

#include <queue>
#include <mutex>
#include <thread>
#include <vector>

using namespace std;


class Dispatcher {
	static queue<AbstractRequest*> requests;
	static queue<Worker*> workers;
	static mutex requestsMutex;
	static mutex workersMutex;
	static vector<Worker*> allWorkers;
	static vector<thread*> threads;
	
public:
	static bool init(int workers);
	static bool stop();
	static void addRequest(AbstractRequest* request);
	static bool addWorker(Worker* worker);
};

#endif

As one can see, this is a fully static class. It offers the init(), stop(), addRequest() and addWorker() methods, which should be quite self-explanatory at this point. Moving on with the implementation:

#include "dispatcher.h"

#include <iostream>
using namespace std;


// Static initialisations.
queue<AbstractRequest*> Dispatcher::requests;
queue<Worker*> Dispatcher::workers;
mutex Dispatcher::requestsMutex;
mutex Dispatcher::workersMutex;
vector<Worker*> Dispatcher::allWorkers;
vector<thread*> Dispatcher::threads;


// --- INIT ---
// Start the number of requested worker threads.
bool Dispatcher::init(int workers) {
	thread* t = 0;
	Worker* w = 0;
	for (int i = 0; i < workers; ++i) {
		w = new Worker;
		allWorkers.push_back(w);
		t = new thread(&Worker::run, w);
		threads.push_back(t);
	}
}

After the static initialisations the init() method creates as many threads with Worker instances as specified. Having at least one worker instance here would be beneficial, of course. Each thread and Worker pointer is pushed into its own vector for later use:

// --- STOP ---
// Terminate the worker threads and clean up.
bool Dispatcher::stop() {
	for (int i = 0; i < allWorkers.size(); ++i) {
		allWorkers[i]->stop();
	}
	
	cout << "Stopped workers.\n";
	
	for (int j = 0; j < threads.size(); ++j) {
		threads[j]->join();
		
		cout << "Joined threads.\n";
	}
}

In the stop() method both vectors are used, first to tell each Worker instance to terminate, then to wait for each thread instance to join.

The real magic starts in the remaining two methods, however:

// --- ADD REQUEST ---
void Dispatcher::addRequest(AbstractRequest* request) {
	// Check whether there's a worker available in the workers queue, else add
	// the request to the requests queue.
	workersMutex.lock();
	if (!workers.empty()) {
		Worker* worker = workers.front();
		worker->setRequest(request);
		condition_variable* cv;
		worker->getCondition(cv);
		cv->notify_one();
		workers.pop();
		workersMutex.unlock();
	}
	else {
		workersMutex.unlock();
		requestsMutex.lock();
		requests.push(request);
		requestsMutex.unlock();
	}
}


// --- ADD WORKER ---
bool Dispatcher::addWorker(Worker* worker) {
	// If a request is waiting in the requests queue, assign it to the worker.
	// Else add the worker to the workers queue.
	// Returns true if the worker was added to the queue and has to wait for
	// its condition variable.
	bool wait = true;
	requestsMutex.lock();
	if (!requests.empty()) {
		AbstractRequest* request = requests.front();
		worker->setRequest(request);
		requests.pop();
		wait = false;
		requestsMutex.unlock();
	}
	else {
		requestsMutex.unlock();
		workersMutex.lock();
		workers.push(worker);
		workersMutex.unlock();
	}
	
	return wait;
}

The inline comments explain the basic code flow in each method. A request is added to an available worker thread if available, else it’s added to the queue. An incoming Worker has a request assigned to it if available, else it’s added to the queue. A waiting Worker has its condition variable signalled.

The next important class to look at is the abstract AbstractRequest class:

#pragma once
#ifndef ABSTRACT_REQUEST_H
#define ABSTRACT_REQUEST_H


class AbstractRequest {
	//
	
public:
	virtual void setValue(int value) = 0;
	virtual void process() = 0;
	virtual void finish() = 0;
};

#endif

This class defines the interface for each request type. Deriving from it, one can implement specific requests to fit whichever source they originate from and whatever response format they require. One can also add specific extra methods to the request implementation when needed:

#pragma once
#ifndef REQUEST_H
#define REQUEST_H

#include "abstract_request.h"


#include <string>

using namespace std;


typedef void (*logFunction)(string text);


class Request : public AbstractRequest {
	int value;
	logFunction outFnc;
	
public:
	void setValue(int value) { this->value = value; }
	void setOutput(logFunction fnc) { outFnc = fnc; }
	void process();
	void finish();
};

#endif

As one can see in the Request implementation which we employ in the usage example, we added an additional setOutput() method to the API, which allows one to assign a function pointer which is used for logging:

#include "request.h"


// --- PROCESS ---
void Request::process() {
	outFnc("Starting processing request " + std::to_string(value) + "...");
}


// --- FINISH ---
void Request::finish() {
	outFnc("Finished request " + std::to_string(value));
}

This request implementation does not do anything in particular, merely outputting on the function pointer when it’s been told to process or finish the request. In an actual implementation one would here do the processing, or alternatively one can use a different AbstractRequest API and assign the request instance to another (static) class which handles the processing. This is heavily implementation-dependent, however.

That said, let’s finish by looking at the Worker class:

#pragma once
#ifndef WORKER_H
#define WORKER_H

#include "abstract_request.h"

#include <condition_variable>
#include <mutex>

using namespace std;


class Worker {
	condition_variable cv;
	mutex mtx;
	unique_lock<mutex> ulock;
	AbstractRequest* request;
	bool running;
	bool ready;
	
public:
	Worker() { running = true; ready = false; ulock = unique_lock<mutex>(mtx); }
	void run();
	void stop() { running = false; }
	void setRequest(AbstractRequest* request) { this->request = request; ready = true; }
	void getCondition(condition_variable* &cv);
};

#endif

The constructor is very basic, merely initialising some variables, including the mutex lock used by the condition variable. The run() method is the entry point for the thread running the Worker instance. The stop() method serves to terminate the loop in the run() method as we’ll see in a moment.

Finally the setRequest() method is used by the Dispatcher to set a new request, with getCondition() used to obtain a reference to the Worker’s condition variable so that it can be signalled.

#include "worker.h"
#include "dispatcher.h"

#include <chrono>

using namespace std;


// --- GET CONDITION ---
void Worker::getCondition(condition_variable* &cv) {
	cv = &(this)->cv;
}


// --- RUN ---
// Runs the worker instance.
void Worker::run() {
	while (running) {
		if (ready) {
			// Execute the request.
			ready = false;
			request->process();
			request->finish();
		}
		
		// Add self to Dispatcher queue and execute next request or wait.
		if (Dispatcher::addWorker(this)) {
			// Use the ready loop to deal with spurious wake-ups.
			while (!ready && running) {
				if (cv.wait_for(ulock, chrono::seconds(1)) == cv_status::timeout) {
					// We timed out, but we keep waiting unless the worker is
					// stopped by the dispatcher.
				}
			}
		}
	}
}

The run() method immediately enters a while loop which checks the ready variable. This boolean variable is not changed unless the stop() method is called. Since it’s a boolean, access should be atomic, but for a production implementation one may wish to verify this for the relevant processor architecture.

The other boolean variable (ready) indicates whether a request is waiting to be executed. If it’s not, the Worker adds itself to the Dispatcher. At this point two things can happen: a request is assigned (changing ready to true), or the Worker is assigned to the queue and has to wait. In the latter case the condition variable is waited for, until signalled, or the timer expires.

Hereby the ready variable serves to deal with spurious wake-ups: unless a request has been set, the condition variable will simply be waited upon again. Only if ready is changed to true, or the running variable to false will the waiting loop terminate. In the case of the latter variable being changed, this will also terminate the main loop.

Upon executing the demonstration application, we see the following output:

$ ./dispatcher_demo.exe
Initialised.
Starting processing request 0...
Starting processing request 2...
Finished request 2
Starting processing request 3...
Starting processing request 7...
Starting processing request 9...
Finished request 7
Starting processing request 10...
Finished request 10
Starting processing request 11...
Finished request 11
Starting processing request 12...
Finished request 12
Starting processing request 13...
Finished request 13
Starting processing request 14...
Finished request 14
Starting processing request 15...
Finished request 15
Starting processing request 16...
Finished request 16
Starting processing request 17...
Finished request 17
Finished request 3
Starting processing request 18...
Finished request 9
Starting processing request 19...
Finished request 18
Starting processing request 20...
Starting processing request 21...
Finished request 20
Starting processing request 4...
Finished request 21
Starting processing request 22...
Starting processing request 8...
Finished request 22
Finished request 4
Starting processing request 23...
Finished request 8
Finished request 23
Starting processing request 24...
Starting processing request 25...
Starting processing request 27...
Finished request 24
Starting processing request 26...
Finished request 27
Starting processing request 28...
Finished request 25
Finished request 28
Finished request 26
Starting processing request 29...
Starting processing request 31...
Starting processing request 30...
Starting processing request 32...
Finished request 32
Finished request 31
Starting processing request 33...
Starting processing request 1...
Finished request 33
Starting processing request 34...
Starting processing request 5...
Finished request 1
Finished request 0
Finished request 5
Starting processing request 36...
Starting processing request 37...
Finished request 36
Starting processing request 6...
Finished request 37
Finished request 6
Starting processing request 40...
Starting processing request 41...
Starting processing request 38...
Finished request 19
Starting processing request 39...
Finished request 38
Finished request 41
Starting processing request 43...
Starting processing request 35...
Starting processing request 44...
Finished request 29
Finished request 44
Finished request 35
Starting processing request 46...
Starting processing request 45...
Finished request 30
Finished request 45
Finished request 46
Starting processing request 48...
Starting processing request 49...
Finished request 48
Finished request 49
Finished request 34
Starting processing request 47...
Finished request 47
Finished request 39
Finished request 43
Finished request 40
Starting processing request 42...
Finished request 42
Stopped workers.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Clean-up done.

One can see that even though the requests are passed in in sequential order, the processing is performed in a definitely asynchronous manner. This particular demonstration was compiled using GCC 5.3 (64-bit) in MSYS2 Bash on Windows 7, and run on an Intel i7 6700K system, with 4 hardware cores and 8 effective due to hyper-threading enabled.

As a bonus, here’s the Makefile used to compile the above code:

GCC := g++

OUTPUT := dispatcher_demo
SOURCES := $(wildcard *.cpp)
CCFLAGS := -std=c++11 -g3

all: $(OUTPUT)
	
$(OUTPUT):
	$(GCC) -o $(OUTPUT) $(CCFLAGS) $(SOURCES)
	
clean:
	rm $(OUTPUT)
	
.PHONY: all

I hope that this article was at least somewhat useful or informative to any who read it. Feel free to leave any questions and/or constructive feedback 🙂

Maya

How To Really, Truly Use QThreads; The Full Explanation

November 1, 2011 88 comments

Threads in an operating system are a very simple thing. Write a function, maybe bundle it with some data and push it onto a newly created thread. Use a mutex or other method to safely communicate with the thread if necessary. Whether it are Win32, POSIX or other threads, they all basically work the same and are quite fool-proof. I’d venture to say that they’re at least a lot easier to use and handle than sockets 🙂

Those who have discovered the joys of the Qt framework may assume that threads in Qt (QThread) are just like this, but they would be both wrong and right. Wrong because years of wrong documentation from Trolltech/Nokia on QThread has caused countless people to use QThreads in a convoluted and highly inappropriate manner. Right because QThreads are in fact quite easy to use, as long as you ignore the incorrect official Qt documentation on QThread [1] and the myriad of wrongful methods being employed.

The main thing to keep in mind when using a QThread is that it’s not a thread. It’s a wrapper around a thread object. This wrapper provides the signals, slots and methods to easily use the thread object within a Qt project. This should immediately show why the recommended way of using QThreads in the documentation, namely to sub-class it and implement your own run() function, is very wrong. A QThread should be used much like a regular thread instance: prepare an object (QObject) class with all your desired functionality in it. Then create a new QThread instance, push the QObject onto it using moveToThread(QThread*) of the QObject instance and call start() on the QThread instance. That’s all. You set up the proper signal/slot connections to make it quit properly and such, and that’s all.

For a basic example, check this class declaration for the Worker class:

class Worker : public QObject {
    Q_OBJECT

public:
    Worker();
    ~Worker();

public slots:
    void process();

signals:
    void finished();
    void error(QString err);

private:
    // add your variables here
};

We add at least one public slot which will be used to trigger the instance and make it start processing data once the thread has started. Now, let’s see what the implementation for this basic class looks like.

// --- CONSTRUCTOR ---
Worker::Worker() {
    // you could copy data from constructor arguments to internal variables here.
}

// --- DECONSTRUCTOR ---
Worker::~Worker() {
    // free resources
}

// --- PROCESS ---
// Start processing data.
void Worker::process() {
    // allocate resources using new here
    qDebug("Hello World!");
    emit finished();
}

While this Worker class doesn’t do anything special, it nevertheless contains all the required elements. It starts processing when its main function, in this case process(), is called and when it is done it emits the signal finished() which will then be used to trigger the shutdown of the QThread instance it is contained in.

By the way, one extremely important thing to note here is that you should NEVER allocate heap objects (using new) in the constructor of the QObject class as this allocation is then performed on the main thread and not on the new QThread instance, meaning that the newly created object is then owned by the main thread and not the QThread instance. This will make your code fail to work. Instead, allocate such resources in the main function slot such as process() in this case as when that is called the object will be on the new thread instance and thus it will own the resource.

Now, let’s see how to use this new construction by creating a new Worker instance and putting it on a QThread instance:

QThread* thread = new QThread;
Worker* worker = new Worker();
worker->moveToThread(thread);
connect(worker, SIGNAL(error(QString)), this, SLOT(errorString(QString)));
connect(thread, SIGNAL(started()), worker, SLOT(process()));
connect(worker, SIGNAL(finished()), thread, SLOT(quit()));
connect(worker, SIGNAL(finished()), worker, SLOT(deleteLater()));
connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
thread->start();

The connect() series here is the most crucial part. The first connect() line hooks up the error message signal from the worker to an error processing function in the main thread. The second connects the thread’s started() signal to the processing() slot in the worker, causing it to start.

Then the clean-up: when the worker instance emits finished(), as we did in the example, it will signal the thread to quit, i.e. shut down. We then mark the worker instance using the same finished() signal for deletion. Finally, to prevent nasty crashes because the thread hasn’t fully shut down yet when it is deleted, we connect the finished() of the thread (not the worker!) to its own deleteLater() slot. This will cause the thread to be deleted only after it has fully shut down.

I hope this small tutorial is of some use to people out there. It has taken me months to put together this method of doing things involving countless frustrating hours of debugging and Google searches which kept repeating the same wrong mantra of ‘sub-class QThread’. Constructive feedback on this tutorial is much appreciated. Emails to Nokia on the wrong QThreads documentation might be a good idea too as I would love for the myths surrounding QThread to finally be erased from this world 🙂

Maya

[1] http://doc.trolltech.com/4.7/qthread.html

On The Topic Of Multi-Threading

August 20, 2011 2 comments

Multi-threading is one of those things which initially has this magical sound to it, and as long as you don’t pay too close attention to it, it does keep its fairy-dust glimmer. You may even end up using it once or twice without trying to write a high-performance application. The real point of this article is to discuss some of the finer points of multi-threading, henceforth we’ll be skipping the high-level languages such as Java and C#, as well as fancy high-level, cross-platform libraries such as Qt, and instead dive straight into the messy, low-level stuff. Do mind your step 🙂

The primary language used for writing high-performance, multi-threaded applications is C++. Since C++ lacks native multi-threading support (until C++ 2011 becomes official and fully implemented that is), one has to pick from a variety of threading API options, the choice of which will depend on the platform of choice. If you only ever run the application on a single operating system, and never plan to port it, you can just go with whatever native API your OS provides, be it Windows, Linux or OS X. This is not a bad choice, as any other libraries you can pick will just build upon this native threading API.

If you’re like me, however, and would like to make porting one’s applications to other platforms as easy as a simple recompile, then one has to pick a library which offers this functionality and portability. While there are various ones out there, including Intel Threading Building Blocks (TBB) [1] and Boost Thread, they each come with their own advantages and disadvantages. Intel TBB is the more high-level one of the two, using abstractions to take away many of the gritty details of multi-threading, whereas Boost Thread (BT from hereon) is quite low-level, making you do the resource management yourself.

The crucial detail when designing a multi-threaded application is the need to balance between efficiency of execution and maintainability. While I haven’t used Intel TBB myself yet, it seems like it could make things a lot easier to build and as long as they keep working, it’s fine. It has to be hell to debug if something goes wrong, though. With a low-level library such as BT there are far fewer layers between you and the system, making maintenance and debugging easier. In theory, of course.

For BT, launching a thread is as easy as:

MyClass worker(1); // our freshly initialized class
boost::thread myThread(worker); // insert into new thread instance

That’s it. The thing where TBB might be easier is when you have to manage a large number of threads, but that’s something you have to look at for yourself.

As I pointed out earlier, in the end it are always the platform’s native threading APIs which are being used for the actual threading. These approaches aren’t too dissimilar. A new thread is created and is allocated a task to run. So-called mutexes, spinlocks and other structures are then used to ensure that a single piece of data that is shared by multiple threads isn’t accessed simultaneously, as this could lead to undesirable behaviour, data corruption and crashes. At this level things are still quite easy to understand.

The part where it gets messy is when you move on to the actual implementation in the hardware which makes this possible. Before the arrival of multi-core processors, multi-threading truly was an illusion, as there’d never been two tasks simultaneously active. Instead the OS’s task scheduler would swap out tasks, giving each a time slice to do its things before its state being saved to the stack again and another task’s state being restored. With multi-core processors two or more tasks can be active simultaneously, yet if you look at for example the statistics provided by the task manager of your operating system and particularly the number of active threads, you’ll see that it’s far higher than the number of cores in your system. For me it’s above 1,000 active threads as I write this.

Task-switching is thus still a very common practice, and we run into the first hurdle when it comes to reliable multi-threading: the OS’s task scheduler. As described earlier, it’s the piece of code which determines which task gets to run and in which order. Countless approaches to task scheduling exist, each being more beneficial to particular scenarios. In an embedded, real-time OS such as QNX the emphasis would be on exact time slices and timing, so that any scheduled task would run on time and exactly for as long it has to be. For a desktop OS such as Windows there’s no such need and scheduling is far more loose. It’s a pretty chaotic environment anyway, so if a task doesn’t run for exactly 100 ms, few will notice.

So in essence your threads will be competing with all the other threads which are active at that time. Don’t count on exact timing, and expect some of your threads to be waiting on results from other threads, depending on the design you’re using. On a hardware level, threading is more smoke and mirrors than the clean and pristine world of software makes us believe in. The task scheduler can mess up allocating your threads, reducing performance significantly. Moving threads between cores will cause all the data it had previously gathered in that core’s L1 and L2 cache to be invalidated and have to start all over again on the new core it’s assigned to. Similarly, task switching on a single core will ‘pollute’ the caches with data your thread doesn’t need, also leading to performance reductions.

One can lock threads to a single core to prevent such problems, but whether it’s the right choice depends again on the situation. It’s a good idea to try multiple approaches and see what works best. Use accurate timing methods and perform multiple runs (at least 5 or so) to rule out any glitches and to ensure you get useful data to base your decision on. Repeat this for every platform and in the case of Linux and similar OSs which allow you to swap out the task scheduler, for each type of task scheduler you will be deploying the application on.

Of course, we are talking about high-performance multi-threading here. If you are just running a processing task in a thread to not disrupt the UI thread, then by all means just use Qt’s threading functionality or so. Even if its abstractions and sometimes poor documentation can make it more of a headache than doing it the low-level approach.

Next article should be on the Android game project again. Until then,

Maya

[1] http://en.wikipedia.org/wiki/Intel_Threading_Building_Blocks
[2] http://www.boost.org/doc/libs/1_47_0/doc/html/thread.html