C++ – How to create an efficient multi-threaded task scheduler in C++

clinuxmultithreadingpthreadstask

I'd like to create a very efficient task scheduler system in C++.

The basic idea is this:

class Task {
    public:
        virtual void run() = 0;
};

class Scheduler {
    public:
        void add(Task &task, double delayToRun);
};

Behind Scheduler, there should be a fixed-size thread pool, which run the tasks (I don't want to create a thread for each task). delayToRun means that the task doesn't get executed immediately, but delayToRun seconds later (measuring from the point it was added into the Scheduler).

(delayToRun means an "at-least" value, of course. If the system is loaded, or if we ask the impossible from the Scheduler, it won't be able to handle our request. But it should do the best it can)

And here's my problem. How to implement delayToRun functionality efficiently? I'm trying to solve this problem with the use of mutexes and condition variables.

I see two ways:

With manager thread

Scheduler contains two queues: allTasksQueue, and tasksReadyToRunQueue. A task gets added into allTasksQueue at Scheduler::add. There is a manager thread, which waits the smallest amount of time so it can put a task from allTasksQueue to tasksReadyToRunQueue. Worker threads wait for a task available in tasksReadyToRunQueue.

If Scheduler::add adds a task in front of allTasksQueue (a task, which has a value of delayToRun so it should go before the current soonest-to-run task), then the manager task need to be woken up, so it can update the time of wait.

This method can be considered inefficient, because it needs two queues, and it needs two condvar.signals to make a task run (one for allTasksQueue->tasksReadyToRunQueue, and one for signalling a worker thread to actually run the task)

Without manager thread

There is one queue in the scheduler. A task gets added into this queue at Scheduler::add. A worker thread checks the queue. If it is empty, it waits without a time constraint. If it is not empty, it waits for the soonest task.

  1. If there is only one condition variable for which the working threads waiting for: this method can be considered inefficient, because if a task added in front of the queue (front means, if there are N worker threads, then the task index < N) then all the worker threads need to be woken up to update the time which they are waiting for.

  2. If there is a separate condition variable for each thread, then we can control which thread to wake up, so in this case we don't need to wake up all threads (we only need to wake up the thread which has the largest waiting time, so we need to manage this value). I'm currently thinking about implementing this, but working out the exact details are complex. Are there any recommendations/thoughts/document on this method?


Is there any better solution for this problem? I'm trying to use standard C++ features, but I'm willing to use platform dependent (my main platform is linux) tools too (like pthreads), or even linux specific tools (like futexes), if they provide a better solution.

Best Answer

You can avoid both having a separate "manager" thread, and having to wake up a large number of tasks when the next-to-run task changes, by using a design where a single pool thread waits for the "next to run" task (if there is one) on one condition variable, and the remaining pool threads wait indefinitely on a second condition variable.

The pool threads would execute pseudocode along these lines:

pthread_mutex_lock(&queue_lock);

while (running)
{
    if (head task is ready to run)
    {
        dequeue head task;
        if (task_thread == 1)
            pthread_cond_signal(&task_cv);
        else
            pthread_cond_signal(&queue_cv);

        pthread_mutex_unlock(&queue_lock);
        run dequeued task;
        pthread_mutex_lock(&queue_lock);
    }
    else if (!queue_empty && task_thread == 0)
    {
        task_thread = 1;
        pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run);
        task_thread = 0;
    }
    else
    {
        pthread_cond_wait(&queue_cv, &queue_lock);
    }
}

pthread_mutex_unlock(&queue_lock);

If you change the next task to run, then you execute:

if (task_thread == 1)
    pthread_cond_signal(&task_cv);
else
    pthread_cond_signal(&queue_cv);

with the queue_lock held.

Under this scheme, all wakeups are directly at only a single thread, there's only one priority queue of tasks, and there's no manager thread required.