Home > C++, Multi-threading, programming > Request dispatcher using C++ threads

Request dispatcher using C++ threads

A couple of weeks ago I found myself looking at implementing a request dispatcher for an existing embedded C++ project. The basic concept was to have a static dispatcher class which could have requests added to it from various sources. Worker threads would then get requests when available and process them.

Looking around for inspiration I didn’t find anything which really appealed to me. The main issue with many of them was that they didn’t so much use a dispatcher as more of a request queue, with each worker thread responsible for polling the queue in order to wait for new requests. This seemed rather messy and error-prone to me.

The solution I ultimately settled on was a dispatcher which would spawn a specified number of worker threads, each of which would launch and add themselves to the dispatcher, where they would wait on a request using their own condition variable. An incoming request would be assigned to a waiting worker, after which its condition variable would be signalled. The worker thread would then process the request, finish it and add itself back to the dispatcher.

While I originally implemented the Dispatcher in C++ 2003 using the POCO libraries for its threading features as C++11 support was absent for the target platform, I later reimplemented it in pure C++11 using its native threading API. Basic usage of this dispatcher looks like the following:

#include "dispatcher.h"
#include "request.h"

#include <iostream>
#include <string>
#include <csignal>
#include <thread>
#include <chrono>

using namespace std;


// Globals
sig_atomic_t signal_caught = 0;
mutex logMutex;


void sigint_handler(int sig) {
	signal_caught = 1;
}


void logFnc(string text) {
	logMutex.lock();
	cout << text << "\n";
	logMutex.unlock();
}


int main() {
	signal(SIGINT, &sigint_handler);
	Dispatcher::init(10);
	
	cout << "Initialised.\n";
	
	int cycles = 0;
	Request* rq = 0;
	while (!signal_caught && cycles < 50) {
		rq = new Request();
		rq->setValue(cycles);
		rq->setOutput(&logFnc);
		Dispatcher::addRequest(rq);
		cycles++;
	}
	
	this_thread::sleep_for(chrono::seconds(5));
	
	Dispatcher::stop();
	cout << "Clean-up done.\n";
	
	return 0;
}

Of note here are the sigint_handler() and logFnc() methods. The first is used in combination with signal() from the header to allow one to interrupt the application. This is useful for for example a server application. In this particular implementation it’s unnecessary, but serves as inspiration for a more complex application.

The logFnc() method is the method being passed to each request, to allow it to pass any output to from the processing of the request. Here it’s just used to safely output to cout.

The main() method is rather uneventful: after installing the SIGINT signal handler, the Dispatcher is initialised with a total of ten worker threads. After this a loop is entered during which a total of fifty Request instances are added to the Dispatcher. Next we sleep for five seconds to allow all requests to be processed. After this we stop the Dispatcher: this will terminate all worker threads.

Moving on, the Dispatcher class definition looks as follows:

#pragma once
#ifndef DISPATCHER_H
#define DISPATCHER_H

#include "abstract_request.h"
#include "worker.h"

#include <queue>
#include <mutex>
#include <thread>
#include <vector>

using namespace std;


class Dispatcher {
	static queue<AbstractRequest*> requests;
	static queue<Worker*> workers;
	static mutex requestsMutex;
	static mutex workersMutex;
	static vector<Worker*> allWorkers;
	static vector<thread*> threads;
	
public:
	static bool init(int workers);
	static bool stop();
	static void addRequest(AbstractRequest* request);
	static bool addWorker(Worker* worker);
};

#endif

As one can see, this is a fully static class. It offers the init(), stop(), addRequest() and addWorker() methods, which should be quite self-explanatory at this point. Moving on with the implementation:

#include "dispatcher.h"

#include <iostream>
using namespace std;


// Static initialisations.
queue<AbstractRequest*> Dispatcher::requests;
queue<Worker*> Dispatcher::workers;
mutex Dispatcher::requestsMutex;
mutex Dispatcher::workersMutex;
vector<Worker*> Dispatcher::allWorkers;
vector<thread*> Dispatcher::threads;


// --- INIT ---
// Start the number of requested worker threads.
bool Dispatcher::init(int workers) {
	thread* t = 0;
	Worker* w = 0;
	for (int i = 0; i < workers; ++i) {
		w = new Worker;
		allWorkers.push_back(w);
		t = new thread(&Worker::run, w);
		threads.push_back(t);
	}
}

After the static initialisations the init() method creates as many threads with Worker instances as specified. Having at least one worker instance here would be beneficial, of course. Each thread and Worker pointer is pushed into its own vector for later use:

// --- STOP ---
// Terminate the worker threads and clean up.
bool Dispatcher::stop() {
	for (int i = 0; i < allWorkers.size(); ++i) {
		allWorkers[i]->stop();
	}
	
	cout << "Stopped workers.\n";
	
	for (int j = 0; j < threads.size(); ++j) {
		threads[j]->join();
		
		cout << "Joined threads.\n";
	}
}

In the stop() method both vectors are used, first to tell each Worker instance to terminate, then to wait for each thread instance to join.

The real magic starts in the remaining two methods, however:

// --- ADD REQUEST ---
void Dispatcher::addRequest(AbstractRequest* request) {
	// Check whether there's a worker available in the workers queue, else add
	// the request to the requests queue.
	workersMutex.lock();
	if (!workers.empty()) {
		Worker* worker = workers.front();
		worker->setRequest(request);
		condition_variable* cv;
		worker->getCondition(cv);
		cv->notify_one();
		workers.pop();
		workersMutex.unlock();
	}
	else {
		workersMutex.unlock();
		requestsMutex.lock();
		requests.push(request);
		requestsMutex.unlock();
	}
}


// --- ADD WORKER ---
bool Dispatcher::addWorker(Worker* worker) {
	// If a request is waiting in the requests queue, assign it to the worker.
	// Else add the worker to the workers queue.
	// Returns true if the worker was added to the queue and has to wait for
	// its condition variable.
	bool wait = true;
	requestsMutex.lock();
	if (!requests.empty()) {
		AbstractRequest* request = requests.front();
		worker->setRequest(request);
		requests.pop();
		wait = false;
		requestsMutex.unlock();
	}
	else {
		requestsMutex.unlock();
		workersMutex.lock();
		workers.push(worker);
		workersMutex.unlock();
	}
	
	return wait;
}

The inline comments explain the basic code flow in each method. A request is added to an available worker thread if available, else it’s added to the queue. An incoming Worker has a request assigned to it if available, else it’s added to the queue. A waiting Worker has its condition variable signalled.

The next important class to look at is the abstract AbstractRequest class:

#pragma once
#ifndef ABSTRACT_REQUEST_H
#define ABSTRACT_REQUEST_H


class AbstractRequest {
	//
	
public:
	virtual void setValue(int value) = 0;
	virtual void process() = 0;
	virtual void finish() = 0;
};

#endif

This class defines the interface for each request type. Deriving from it, one can implement specific requests to fit whichever source they originate from and whatever response format they require. One can also add specific extra methods to the request implementation when needed:

#pragma once
#ifndef REQUEST_H
#define REQUEST_H

#include "abstract_request.h"


#include <string>

using namespace std;


typedef void (*logFunction)(string text);


class Request : public AbstractRequest {
	int value;
	logFunction outFnc;
	
public:
	void setValue(int value) { this->value = value; }
	void setOutput(logFunction fnc) { outFnc = fnc; }
	void process();
	void finish();
};

#endif

As one can see in the Request implementation which we employ in the usage example, we added an additional setOutput() method to the API, which allows one to assign a function pointer which is used for logging:

#include "request.h"


// --- PROCESS ---
void Request::process() {
	outFnc("Starting processing request " + std::to_string(value) + "...");
}


// --- FINISH ---
void Request::finish() {
	outFnc("Finished request " + std::to_string(value));
}

This request implementation does not do anything in particular, merely outputting on the function pointer when it’s been told to process or finish the request. In an actual implementation one would here do the processing, or alternatively one can use a different AbstractRequest API and assign the request instance to another (static) class which handles the processing. This is heavily implementation-dependent, however.

That said, let’s finish by looking at the Worker class:

#pragma once
#ifndef WORKER_H
#define WORKER_H

#include "abstract_request.h"

#include <condition_variable>
#include <mutex>

using namespace std;


class Worker {
	condition_variable cv;
	mutex mtx;
	unique_lock<mutex> ulock;
	AbstractRequest* request;
	bool running;
	bool ready;
	
public:
	Worker() { running = true; ready = false; ulock = unique_lock<mutex>(mtx); }
	void run();
	void stop() { running = false; }
	void setRequest(AbstractRequest* request) { this->request = request; ready = true; }
	void getCondition(condition_variable* &cv);
};

#endif

The constructor is very basic, merely initialising some variables, including the mutex lock used by the condition variable. The run() method is the entry point for the thread running the Worker instance. The stop() method serves to terminate the loop in the run() method as we’ll see in a moment.

Finally the setRequest() method is used by the Dispatcher to set a new request, with getCondition() used to obtain a reference to the Worker’s condition variable so that it can be signalled.

#include "worker.h"
#include "dispatcher.h"

#include <chrono>

using namespace std;


// --- GET CONDITION ---
void Worker::getCondition(condition_variable* &cv) {
	cv = &(this)->cv;
}


// --- RUN ---
// Runs the worker instance.
void Worker::run() {
	while (running) {
		if (ready) {
			// Execute the request.
			ready = false;
			request->process();
			request->finish();
		}
		
		// Add self to Dispatcher queue and execute next request or wait.
		if (Dispatcher::addWorker(this)) {
			// Use the ready loop to deal with spurious wake-ups.
			while (!ready && running) {
				if (cv.wait_for(ulock, chrono::seconds(1)) == cv_status::timeout) {
					// We timed out, but we keep waiting unless the worker is
					// stopped by the dispatcher.
				}
			}
		}
	}
}

The run() method immediately enters a while loop which checks the ready variable. This boolean variable is not changed unless the stop() method is called. Since it’s a boolean, access should be atomic, but for a production implementation one may wish to verify this for the relevant processor architecture.

The other boolean variable (ready) indicates whether a request is waiting to be executed. If it’s not, the Worker adds itself to the Dispatcher. At this point two things can happen: a request is assigned (changing ready to true), or the Worker is assigned to the queue and has to wait. In the latter case the condition variable is waited for, until signalled, or the timer expires.

Hereby the ready variable serves to deal with spurious wake-ups: unless a request has been set, the condition variable will simply be waited upon again. Only if ready is changed to true, or the running variable to false will the waiting loop terminate. In the case of the latter variable being changed, this will also terminate the main loop.

Upon executing the demonstration application, we see the following output:

$ ./dispatcher_demo.exe
Initialised.
Starting processing request 0...
Starting processing request 2...
Finished request 2
Starting processing request 3...
Starting processing request 7...
Starting processing request 9...
Finished request 7
Starting processing request 10...
Finished request 10
Starting processing request 11...
Finished request 11
Starting processing request 12...
Finished request 12
Starting processing request 13...
Finished request 13
Starting processing request 14...
Finished request 14
Starting processing request 15...
Finished request 15
Starting processing request 16...
Finished request 16
Starting processing request 17...
Finished request 17
Finished request 3
Starting processing request 18...
Finished request 9
Starting processing request 19...
Finished request 18
Starting processing request 20...
Starting processing request 21...
Finished request 20
Starting processing request 4...
Finished request 21
Starting processing request 22...
Starting processing request 8...
Finished request 22
Finished request 4
Starting processing request 23...
Finished request 8
Finished request 23
Starting processing request 24...
Starting processing request 25...
Starting processing request 27...
Finished request 24
Starting processing request 26...
Finished request 27
Starting processing request 28...
Finished request 25
Finished request 28
Finished request 26
Starting processing request 29...
Starting processing request 31...
Starting processing request 30...
Starting processing request 32...
Finished request 32
Finished request 31
Starting processing request 33...
Starting processing request 1...
Finished request 33
Starting processing request 34...
Starting processing request 5...
Finished request 1
Finished request 0
Finished request 5
Starting processing request 36...
Starting processing request 37...
Finished request 36
Starting processing request 6...
Finished request 37
Finished request 6
Starting processing request 40...
Starting processing request 41...
Starting processing request 38...
Finished request 19
Starting processing request 39...
Finished request 38
Finished request 41
Starting processing request 43...
Starting processing request 35...
Starting processing request 44...
Finished request 29
Finished request 44
Finished request 35
Starting processing request 46...
Starting processing request 45...
Finished request 30
Finished request 45
Finished request 46
Starting processing request 48...
Starting processing request 49...
Finished request 48
Finished request 49
Finished request 34
Starting processing request 47...
Finished request 47
Finished request 39
Finished request 43
Finished request 40
Starting processing request 42...
Finished request 42
Stopped workers.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Clean-up done.

One can see that even though the requests are passed in in sequential order, the processing is performed in a definitely asynchronous manner. This particular demonstration was compiled using GCC 5.3 (64-bit) in MSYS2 Bash on Windows 7, and run on an Intel i7 6700K system, with 4 hardware cores and 8 effective due to hyper-threading enabled.

As a bonus, here’s the Makefile used to compile the above code:

GCC := g++

OUTPUT := dispatcher_demo
SOURCES := $(wildcard *.cpp)
CCFLAGS := -std=c++11 -g3

all: $(OUTPUT)
	
$(OUTPUT):
	$(GCC) -o $(OUTPUT) $(CCFLAGS) $(SOURCES)
	
clean:
	rm $(OUTPUT)
	
.PHONY: all

I hope that this article was at least somewhat useful or informative to any who read it. Feel free to leave any questions and/or constructive feedback 🙂

Maya

  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: