Synchronization¶
Because each task is assigned a fiber for its entire lifetime, that fiber can be scheduled to run on any thread in the
thread pool. When a task requests to sleep or block until woken, the fiber is immediately unscheduled and the thread
picks a new fiber to run. Later when the fiber is woken, any idle thread can resume the task. This means that standard
synchronization primitives like std::mutex
will not function properly; carb.tasking therefore implements its own
primitives that are fiber aware–that is, they notify the scheduler that the thread can switch to a different task:
carb::tasking::MutexWrapper
(similar tostd::mutex
orcarb::thread::mutex
)carb::tasking::RecursiveMutexWrapper
(similar tostd::recursive_mutex
orcarb::thread::recursive_mutex
)carb::tasking::SemaphoreWrapper
(similar tostd::counting_semaphore<>
orcarb::cpp20::counting_semaphore
)carb::tasking::SharedMutexWrapper
(similar tostd::shared_mutex
orcarb::thread::shared_mutex
)carb::tasking::ConditionVariableWrapper
(similar tostd::condition_variable
orstd::condition_variable_any
)
Caution
Using a standard synchronization primitive can bottleneck the system as the thread will not be aware that it can run another task.
Danger
Causing a fiber to sleep or wait while holding any std
mutex lock will cause difficult-to-diagnose errors! The carb.tasking plugin supports a Debug Setting to detect this situation.
Note
Only the synchronization primitives in the carb::tasking
namespace are fiber-aware. Other Carbonite synchronization
primitives are not-fiber aware as they are designed to stand alone without the foundation of the Carbonite Framework.
Parallel-For / applyRange¶
The carb.tasking plugin has means to process a loop in a parallel fashion: carb::tasking::ITasking::applyRange()
or carb::tasking::ITasking::parallelFor()
.
These functions will call a lambda or task function repeatedly (and in parallel) for every index within a range of [0..range)
or [begin..end)
with an optional step
value.
The function does not return until the task function has been called (and returns) for every index in the given range.
std::atomic_int calls{ 0 };
// for (int i = 100; i != 600; i += 5)
tasking->parallelFor(100, 600, 5, [&calls] (int val) {
assert(val >= 100);
assert(val < 600);
assert((val % 5) == 0); // Should be multiple of 5.
calls++;
});
CHECK_EQ(calls, 100);
Warning
A minimum amount of required work must be done for carb::tasking::ITasking::applyRange()
to be an improvement over serial processing. This amount
varies per system, but typically a rule of thumb is a minimum of 1,000,000 instructions should be executed for carb::tasking::ITasking::applyRange()
to produce a benefit. Profiling is highly recommended.
Warning
carb::tasking::ITasking::applyRange()
with non-uniform workloads can produce degenerate task distribution. For instance, say that you have an array
with 1000 items, but 10% of the items take 3x the processing time as the other 90%. Randomly, it may be possible that
one task is chosen to process the slower items and is still working when all other items are complete, causing diminishing
returns. It may be better to partition uniform work into multiple arrays or buckets, and issue multiple concurrent tasks
to process them:
using namespace carb::tasking;
TaskGroup waiter;
tasking->addTask(Priority::eHigh, waiter, [&] {
tasking->applyRange(slow.size(), [&] (size_t i) {
slow[i]->process();
});
});
tasking->addTask(Priority::eHigh, waiter, [&] {
tasking->applyRange(fast.size(), [&] (size_t i) {
fast[i]->process();
});
});
waiter.wait(); // Wait until everything is complete
An alternative to carb::tasking::ITasking::applyRange()
that is slightly more complicated to use but can result in better performance is carb::tasking::ITasking::applyRangeBatch()
.
This is true for two reasons: First, the invokable object is called less frequently but with a contiguous range of items to
process, which reduces function call overhead. Second, a batchHint
parameter can be provided which allows the caller to
specify an ideal batch size. carb::tasking::ITasking::applyRange()
is an adapter for carb::tasking::ITasking::applyRangeBatch()
internally.
Batch size heuristic¶
carb::tasking::ITasking::applyRange()
and carb::tasking::ITasking::applyRangeBatch()
perform operations in parallel while knowing very little about the operations themselves.
These functions must work performantly from extreme cases of small range
but long operation, to large range
with
very short operations. Unfortunately, carb::tasking::ITasking::applyRange()
knows no statistical information about the Callable
operation.
Internally, an atomic index is used to track the indexes that need to be passed to the Callable
,
but too much contention by multiple tasks on that index can cause reduced performance due to cache consistency overhead.
Thus, carb::tasking::ITasking::applyRangeBatch()
employs a heuristic that divides a large range
into smaller batches. This heuristic must also
be very fast to execute as time is of the essence.
The implementation of the heuristic is:
// For lots of (presumably very short) tasks, we can have high contention on data->index that can kill any
// gains that we get from multithreading. So we're going to process lots of tasks in blocks for less
// contention on data->index.
// Figure out which power of 2 we're dealing with limited to 32 bits. The minimum we will ever have is 2 because
// of checks above, but this algorithm expects a minimum of 1.
// count | power-of-two
// ------|-------------
// 1 => 0
// 2 => 1
// 3-4 => 2
// 5-8 => 3
// 9-16 => 4
// and so on. Anything >= UINT_MAX will report powerOf2 = 31
int const powerOf2 = 31 - math::numLeadingZeroBits(uint32_t(range >= UINT_MAX ? UINT_MAX : range));
// Calculate a fast blocksize (approximate sqrt offset by 512). Block size doesn't change to 2 until 2^9 (512).
constexpr static uint16_t kBlockSizeByPowerOf2[] = {
// ( 1) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ( 32,768)
/* blksize */ 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 8, 11, 16,
// (65,536) 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 (2,147,483,648)
/* blksize */ 22, 32, 45, 64, 90, 128, 181, 256, 362, 512, 724, 1024, 1448, 2048, 2896, 4096,
};
// Compute block size
uint16_t const blockSize =
!batchHint ? kBlockSizeByPowerOf2[powerOf2] : uint16_t(std::min(batchHint, size_t(UINT16_MAX)));
If any batchHint
is provided to carb::tasking::ITasking::applyRangeBatch()
it overrides the heuristic. This allows the application to tune the
execution of carb::tasking::ITasking::applyRangeBatch()
(note: for carb::tasking::ITasking::applyRange()
batchHint
is always 0, so the computed size is used).
As an example, an application may choose to force the batch size to 1 with batchHint = 1
when work is non-uniform or
each work index is likely to be long enough that contention on the internal atomic index is negligible.
In any case, tuning with a profiler is highly recommended.
Recursive applyRange¶
carb::tasking::ITasking::applyRange()
(and carb::tasking::ITasking::applyRangeBatch()
) self-limit in terms of recursion. Since these calls are synchronous (in that they must
finish before returning to the caller), they are the tasking system’s highest priority. Consider this example: parallel
processing of a 100x100 two-dimensional array is desired. An carb::tasking::ITasking::applyRange()
call is made for the X dimension, and within
the lambda, another carb::tasking::ITasking::applyRange()
call is made for the Y dimension. This means that potentially 101 carb::tasking::ITasking::applyRange()
calls
will be made that must all complete before the first carb::tasking::ITasking::applyRange()
call can return. Each carb::tasking::ITasking::applyRange()
call desires to use
all of the threads available to the system, but creating NumberOfThreads * 101
tasks is highly costly, and potentially
more costly than serially executing the tasks.
A potential (and historical) way of dealing with this recursion was to divide recursive tasks by the number of threads
participating in parent tasks. In the above example, the Y dimension carb::tasking::ITasking::applyRange()
calls would be limited to 1 thread since
NumberOfThreads
threads were already participating in the X dimension call. This produced unsatisfactory and unoptimal
results, especially with non-uniform workloads, because as threads finished up with the X dimension call they would not
be allowed to participate in the many Y dimension calls.
Instead, each carb::tasking::ITasking::applyRange()
call now adds itself to a global FIFO queue of in-progress carb::tasking::ITasking::applyRange()
calls, notifies the maximum threads
number of threads that can assist, and then the calling thread begins working on the dataset. Threads that are already busy
ignore the notification until they become idle. If no other threads are available to join, each call will still eventually succeed because the current
thread is working. The earlier carb::tasking::ITasking::applyRange()
calls get the most participation, and as threads finish with an carb::tasking::ITasking::applyRange()
call they find the next one that they can participate in, and so on. This allows the system to proceed with any available
work as quickly as possible, rather than each thread being assigned (or limited to) a certain amount of work that produces
unoptimal results.
Synchronous Tasks¶
In some cases, it is necessary to wait for a task to complete. All of the carb::tasking::ITasking::addTask()
functions
return a carb::tasking::Future
object that can be used to wait for a task to complete (and even receive a return value
from a task). Alternately, a carb::tasking::SemaphoreWrapper
or carb::tasking::ConditionVariableWrapper
could be
employed to wait until a specific condition is met.
There is also a function carb::tasking::ITasking::awaitSyncTask()
which will take std::invoke
-style parameters
and execute them synchronously.
TaskGroup¶
It can be advantageous to know when a group of tasks is pending or has completed. This is an oft-used shutdown paradigm
so that a system waits for all tasks to complete before proceeding with shutdown. This can be accomplished with a very
simple class called carb::tasking::TaskGroup
. This class acts like a reverse semaphore: when a task is
tracked by the TaskGroup, the TaskGroup is unsignaled. Only when the TaskGroup is empty (all tasks have completed) does
it become signaled.
Waiting¶
The tasking library has a generic wait function: carb::tasking::ITasking::wait()
(and variants carb::tasking::ITasking::try_wait()
,
carb::tasking::ITasking::try_wait_for()
and carb::tasking::ITasking::try_wait_until()
) which can be used
to wait on any tasking element that corresponds to the carb::tasking::RequiredObject
named requirement. This
includes carb::tasking::Future
, carb::tasking::Counter
, and carb::tasking::TaskGroup
.
Multiple elements corresponding to the carb::tasking::RequiredObject
named requirement can be grouped together
using carb::tasking::Any
and carb::tasking::All
groupers, and the groupers can be nested to form
complex wait mechanisms.
Throttling¶
In some cases it may be desirable to limit how many concurrent tasks can execute. This can be accomplished by using a Semaphore
.
Create the semaphore with a count equal to the maximum desired concurrency. The semaphore can then be passed to carb::tasking::ITasking::addThrottledTask()
which will acquire()
the semaphore before executing the task and release()
the semaphore upon task completion.
This effectively limits the number of concurrent tasks to the initial count of the Semaphore, assuming the tasks have
been queued with carb::tasking::ITasking::addThrottledTask()
and the same Semaphore object.
The “Main” Priority¶
In some cases it may be necessary for certain tasks to execute only in the context of a single consistent thread, typically the
main thread (or initial thread for the application). For this reason, a special Priority value exists: carb::tasking::Priority::eMain
. Any task queued
with this Priority will only execute when the “main” thread calls carb::tasking::ITasking::executeMainTasks()
.
The first thread that calls carb::tasking::ITasking::executeMainTasks()
indicates which thread will be the
“main” thread; any attempt to call the function on a different thread will call std::terminate()
. The only way to change
the “main” thread is to unload and reload carb.tasking.plugin.
These main-priority tasks are also assigned a fiber for their duration, and can still yield to sleep or block on I/O. When resumed,
they will only execute on the “main” thread. When carb::tasking::ITasking::executeMainTasks()
is called, it runs each main-priority task until it
finishes or yields. As such, it is designed to be called periodically by the main loop of the application.
Counters (Deprecated)¶
Caution
Counters are deprecated.
There exists a simple synchronization mechanism: carb::tasking::Counter
which acts like a reverse-semaphore:
it becomes “signaled” when the count reaches a target value (typically 0). Counter objects can be created with carb::tasking::ITasking::createCounter()
or more preferably with carb::tasking::CounterWrapper
.
Deprecation Warning¶
However, this construct is deprecated. As it is a non-standard synchronization primitive, it is highly recommended
that more standard constructs such as carb::tasking::Semaphore
and carb::tasking::Future
are
used instead. Given that a large body of existing code uses them, and some things are still most easily expressed via
Counter, as of version 1.6 of carb.tasking.plugin they remain supported.
Signaling task completion¶
Counters can be used to signal task completion. All of the carb::tasking::ITasking::addTask()
variant functions take a carb::tasking::Trackers
group which can contain zero or more counters. Prior to addTask
returning, all of the given Counter objects are incremented
by one. When the task completes, all of the Counter objects are decremented by one. If the Counter has reached its target
(typically zero), the Counter becomes signaled and any threads/tasks waiting on it may resume (see carb::tasking::ITasking::yieldUntilCounter()
).
Manual Counter Manipulation¶
Although it is deprecated and should be avoided whenever possible, there exists functionality to atomically increment/decrement Counter objects and check whether they are signaled.
Sub-tasks¶
It may also be advantageous for a task to be queued but wait for a previous task to complete. One option is that the first
thing a task function does is yield waiting on a Counter. However, is inefficient in that it requires assigning a fiber
to the task and starting the task only to immediately stop and wait. For this reason, carb::tasking::ITasking::addSubTask()
exists, which takes zero or more Counter objects through a carb::tasking::RequiredObject
helper struct. If
more than one Counter object is required, they must be grouped with carb::tasking::All
or carb::tasking::Any
helper structs. This is more efficient in that it allows the scheduler to understand that a task need not be assigned a
fiber and started until one or more Counter objects have been signaled.
Note
Currently the Any
and All
helper functions may not be nested, but this could change in the future.