Synchronization

Because each task is assigned a fiber for its entire lifetime, that fiber can be scheduled to run on any thread in the thread pool. When a task requests to sleep or block until woken, the fiber is immediately unscheduled and the thread picks a new fiber to run. Later when the fiber is woken, any idle thread can resume the task. This means that standard synchronization primitives like std::mutex will not function properly; carb.tasking therefore implements its own primitives that are fiber aware–that is, they notify the scheduler that the thread can switch to a different task:

Caution

Using a standard synchronization primitive can bottleneck the system as the thread will not be aware that it can run another task.

Danger

Causing a fiber to sleep or wait while holding any std mutex lock will cause difficult-to-diagnose errors! The carb.tasking plugin supports a Debug Setting to detect this situation.

Note

Only the synchronization primitives in the carb::tasking namespace are fiber-aware. Other Carbonite synchronization primitives are not-fiber aware as they are designed to stand alone without the foundation of the Carbonite Framework.

Parallel-For / applyRange

The carb.tasking plugin has means to process a loop in a parallel fashion: carb::tasking::ITasking::applyRange() or carb::tasking::ITasking::parallelFor().

These functions will call a lambda or task function repeatedly (and in parallel) for every index within a range of [0..range) or [begin..end) with an optional step value. The function does not return until the task function has been called (and returns) for every index in the given range.

std::atomic_int calls{ 0 };
// for (int i = 100; i != 600; i += 5)
tasking->parallelFor(100, 600, 5, [&calls] (int val) {
    assert(val >= 100);
    assert(val < 600);
    assert((val % 5) == 0); // Should be multiple of 5.
    calls++;
});
CHECK_EQ(calls, 100);

Warning

A minimum amount of required work must be done for carb::tasking::ITasking::applyRange() to be an improvement over serial processing. This amount varies per system, but typically a rule of thumb is a minimum of 1,000,000 instructions should be executed for carb::tasking::ITasking::applyRange() to produce a benefit. Profiling is highly recommended.

Warning

carb::tasking::ITasking::applyRange() with non-uniform workloads can produce degenerate task distribution. For instance, say that you have an array with 1000 items, but 10% of the items take 3x the processing time as the other 90%. Randomly, it may be possible that one task is chosen to process the slower items and is still working when all other items are complete, causing diminishing returns. It may be better to partition uniform work into multiple arrays or buckets, and issue multiple concurrent tasks to process them:

using namespace carb::tasking;
TaskGroup waiter;
tasking->addTask(Priority::eHigh, waiter, [&] {
    tasking->applyRange(slow.size(), [&] (size_t i) {
        slow[i]->process();
    });
});
tasking->addTask(Priority::eHigh, waiter, [&] {
    tasking->applyRange(fast.size(), [&] (size_t i) {
        fast[i]->process();
    });
});
waiter.wait(); // Wait until everything is complete

An alternative to carb::tasking::ITasking::applyRange() that is slightly more complicated to use but can result in better performance is carb::tasking::ITasking::applyRangeBatch(). This is true for two reasons: First, the invokable object is called less frequently but with a contiguous range of items to process, which reduces function call overhead. Second, a batchHint parameter can be provided which allows the caller to specify an ideal batch size. carb::tasking::ITasking::applyRange() is an adapter for carb::tasking::ITasking::applyRangeBatch() internally.

Batch size heuristic

carb::tasking::ITasking::applyRange() and carb::tasking::ITasking::applyRangeBatch() perform operations in parallel while knowing very little about the operations themselves. These functions must work performantly from extreme cases of small range but long operation, to large range with very short operations. Unfortunately, carb::tasking::ITasking::applyRange() knows no statistical information about the Callable operation. Internally, an atomic index is used to track the indexes that need to be passed to the Callable, but too much contention by multiple tasks on that index can cause reduced performance due to cache consistency overhead. Thus, carb::tasking::ITasking::applyRangeBatch() employs a heuristic that divides a large range into smaller batches. This heuristic must also be very fast to execute as time is of the essence.

The implementation of the heuristic is:

        // For lots of (presumably very short) tasks, we can have high contention on data->index that can kill any
        // gains that we get from multithreading. So we're going to process lots of tasks in blocks for less
        // contention on data->index.

        // Figure out which power of 2 we're dealing with limited to 32 bits. The minimum we will ever have is 2 because
        // of checks above, but this algorithm expects a minimum of 1.
        // count | power-of-two
        // ------|-------------
        //    1 => 0
        //    2 => 1
        //  3-4 => 2
        //  5-8 => 3
        // 9-16 => 4
        // and so on. Anything >= UINT_MAX will report powerOf2 = 31
        int const powerOf2 = 31 - math::numLeadingZeroBits(uint32_t(range >= UINT_MAX ? UINT_MAX : range));

        // Calculate a fast blocksize (approximate sqrt offset by 512). Block size doesn't change to 2 until 2^9 (512).
        constexpr static uint16_t kBlockSizeByPowerOf2[] = {
        //  (     1)   0     1     2     3     4     5     6     7     8     9    10    11    12    13    14    15 (       32,768)
        /* blksize */  1,    1,    1,    1,    1,    1,    1,    1,    1,    2,    3,    4,    5,    8,   11,   16,
        //  (65,536)  16    17    18    19    20    21    22    23    24    25    26    27    28    29    30    31 (2,147,483,648)
        /* blksize */ 22,   32,   45,   64,   90,  128,  181,  256,  362,  512,  724, 1024, 1448, 2048, 2896, 4096,
        };

        // Compute block size
        uint16_t const blockSize =
            !batchHint ? kBlockSizeByPowerOf2[powerOf2] : uint16_t(std::min(batchHint, size_t(UINT16_MAX)));

If any batchHint is provided to carb::tasking::ITasking::applyRangeBatch() it overrides the heuristic. This allows the application to tune the execution of carb::tasking::ITasking::applyRangeBatch() (note: for carb::tasking::ITasking::applyRange() batchHint is always 0, so the computed size is used).

As an example, an application may choose to force the batch size to 1 with batchHint = 1 when work is non-uniform or each work index is likely to be long enough that contention on the internal atomic index is negligible.

In any case, tuning with a profiler is highly recommended.

Recursive applyRange

carb::tasking::ITasking::applyRange() (and carb::tasking::ITasking::applyRangeBatch()) self-limit in terms of recursion. Since these calls are synchronous (in that they must finish before returning to the caller), they are the tasking system’s highest priority. Consider this example: parallel processing of a 100x100 two-dimensional array is desired. An carb::tasking::ITasking::applyRange() call is made for the X dimension, and within the lambda, another carb::tasking::ITasking::applyRange() call is made for the Y dimension. This means that potentially 101 carb::tasking::ITasking::applyRange() calls will be made that must all complete before the first carb::tasking::ITasking::applyRange() call can return. Each carb::tasking::ITasking::applyRange() call desires to use all of the threads available to the system, but creating NumberOfThreads * 101 tasks is highly costly, and potentially more costly than serially executing the tasks.

A potential (and historical) way of dealing with this recursion was to divide recursive tasks by the number of threads participating in parent tasks. In the above example, the Y dimension carb::tasking::ITasking::applyRange() calls would be limited to 1 thread since NumberOfThreads threads were already participating in the X dimension call. This produced unsatisfactory and unoptimal results, especially with non-uniform workloads, because as threads finished up with the X dimension call they would not be allowed to participate in the many Y dimension calls.

Instead, each carb::tasking::ITasking::applyRange() call now adds itself to a global FIFO queue of in-progress carb::tasking::ITasking::applyRange() calls, notifies the maximum threads number of threads that can assist, and then the calling thread begins working on the dataset. Threads that are already busy ignore the notification until they become idle. If no other threads are available to join, each call will still eventually succeed because the current thread is working. The earlier carb::tasking::ITasking::applyRange() calls get the most participation, and as threads finish with an carb::tasking::ITasking::applyRange() call they find the next one that they can participate in, and so on. This allows the system to proceed with any available work as quickly as possible, rather than each thread being assigned (or limited to) a certain amount of work that produces unoptimal results.

Synchronous Tasks

In some cases, it is necessary to wait for a task to complete. All of the carb::tasking::ITasking::addTask() functions return a carb::tasking::Future object that can be used to wait for a task to complete (and even receive a return value from a task). Alternately, a carb::tasking::SemaphoreWrapper or carb::tasking::ConditionVariableWrapper could be employed to wait until a specific condition is met.

There is also a function carb::tasking::ITasking::awaitSyncTask() which will take std::invoke-style parameters and execute them synchronously.

TaskGroup

It can be advantageous to know when a group of tasks is pending or has completed. This is an oft-used shutdown paradigm so that a system waits for all tasks to complete before proceeding with shutdown. This can be accomplished with a very simple class called carb::tasking::TaskGroup. This class acts like a reverse semaphore: when a task is tracked by the TaskGroup, the TaskGroup is unsignaled. Only when the TaskGroup is empty (all tasks have completed) does it become signaled.

Waiting

The tasking library has a generic wait function: carb::tasking::ITasking::wait() (and variants carb::tasking::ITasking::try_wait(), carb::tasking::ITasking::try_wait_for() and carb::tasking::ITasking::try_wait_until()) which can be used to wait on any tasking element that corresponds to the carb::tasking::RequiredObject named requirement. This includes carb::tasking::Future, carb::tasking::Counter, and carb::tasking::TaskGroup.

Multiple elements corresponding to the carb::tasking::RequiredObject named requirement can be grouped together using carb::tasking::Any and carb::tasking::All groupers, and the groupers can be nested to form complex wait mechanisms.

Throttling

In some cases it may be desirable to limit how many concurrent tasks can execute. This can be accomplished by using a Semaphore. Create the semaphore with a count equal to the maximum desired concurrency. The semaphore can then be passed to carb::tasking::ITasking::addThrottledTask() which will acquire() the semaphore before executing the task and release() the semaphore upon task completion.

This effectively limits the number of concurrent tasks to the initial count of the Semaphore, assuming the tasks have been queued with carb::tasking::ITasking::addThrottledTask() and the same Semaphore object.

The “Main” Priority

In some cases it may be necessary for certain tasks to execute only in the context of a single consistent thread, typically the main thread (or initial thread for the application). For this reason, a special Priority value exists: carb::tasking::Priority::eMain. Any task queued with this Priority will only execute when the “main” thread calls carb::tasking::ITasking::executeMainTasks().

The first thread that calls carb::tasking::ITasking::executeMainTasks() indicates which thread will be the “main” thread; any attempt to call the function on a different thread will call std::terminate(). The only way to change the “main” thread is to unload and reload carb.tasking.plugin.

These main-priority tasks are also assigned a fiber for their duration, and can still yield to sleep or block on I/O. When resumed, they will only execute on the “main” thread. When carb::tasking::ITasking::executeMainTasks() is called, it runs each main-priority task until it finishes or yields. As such, it is designed to be called periodically by the main loop of the application.

Counters (Deprecated)

Caution

Counters are deprecated.

There exists a simple synchronization mechanism: carb::tasking::Counter which acts like a reverse-semaphore: it becomes “signaled” when the count reaches a target value (typically 0). Counter objects can be created with carb::tasking::ITasking::createCounter() or more preferably with carb::tasking::CounterWrapper.

Deprecation Warning

However, this construct is deprecated. As it is a non-standard synchronization primitive, it is highly recommended that more standard constructs such as carb::tasking::Semaphore and carb::tasking::Future are used instead. Given that a large body of existing code uses them, and some things are still most easily expressed via Counter, as of version 1.6 of carb.tasking.plugin they remain supported.

Signaling task completion

Counters can be used to signal task completion. All of the carb::tasking::ITasking::addTask() variant functions take a carb::tasking::Trackers group which can contain zero or more counters. Prior to addTask returning, all of the given Counter objects are incremented by one. When the task completes, all of the Counter objects are decremented by one. If the Counter has reached its target (typically zero), the Counter becomes signaled and any threads/tasks waiting on it may resume (see carb::tasking::ITasking::yieldUntilCounter()).

Manual Counter Manipulation

Although it is deprecated and should be avoided whenever possible, there exists functionality to atomically increment/decrement Counter objects and check whether they are signaled.

Sub-tasks

It may also be advantageous for a task to be queued but wait for a previous task to complete. One option is that the first thing a task function does is yield waiting on a Counter. However, is inefficient in that it requires assigning a fiber to the task and starting the task only to immediately stop and wait. For this reason, carb::tasking::ITasking::addSubTask() exists, which takes zero or more Counter objects through a carb::tasking::RequiredObject helper struct. If more than one Counter object is required, they must be grouped with carb::tasking::All or carb::tasking::Any helper structs. This is more efficient in that it allows the scheduler to understand that a task need not be assigned a fiber and started until one or more Counter objects have been signaled.

Note

Currently the Any and All helper functions may not be nested, but this could change in the future.