Anatomy of a Task¶
While a task itself is a simple construct, there is a need to handle various aspects of a task: when it can start, knowing when it has finished and retrieving any return values from it.
Let’s take a look at the most basic way to queue a task: carb::tasking::ITasking::addTask()
/**
* Runs the given function-like object as a task.
*
* @param priority The priority of the task to execute.
* @param trackers (optional) Zero or more counters that are incremented before addTask() returns and are
* decremented upon task completion. These counters can be used to determine task completion.
* @param f A C++ "Callable" object (i.e. functor, lambda, [member] function ptr) that optionally returns a value
* @param args Arguments to pass to @p f
* @return A Future based on the return type of @p f
*/
template <class Callable, class... Args>
auto addTask(Priority priority, Trackers&& trackers, Callable&& f, Args&&... args);
Priority¶
Every task has an associated priority of the value carb::tasking::Priority
. As expected, higher priority tasks
execute before lower priority tasks. As carb.tasking.plugin is a cooperative (as opposed to preemptive) multi-tasking
scheduler, the higher priority tasks will complete (or yield) before lower priority tasks will begin executing.
A special eMain
priority exists to specify that a task should only execute on the “main” thread.
Trackers¶
Trackers are a powerful feature for tasks. The carb::tasking::Trackers
structure accepts zero or more objects
of type carb::tasking::Tracker
which in turn accept multiple types. These types function as input/output
parameters to the tasking system. Each provided Tracker will be “entered” before the addTask
function returns (and before
the task can possibly start), and each provided Tracker will be “exited” when the provided Callable
has completed. Successfully
cancelling a task causes all Trackers to be “exited.”
Note
Trackers are exited after the provided Callable
has completed, but slightly before the TaskContext
for the task becomes signaled (and before any returned Future
becomes signaled). See Task Completion and Guarantees.
No Trackers¶
Either nullptr
or an empty initializer list {}
can be specified to indicate that no Tracker objects are desired.
“Enter” action: None.
“Exit” action: None.
TaskGroup Tracker¶
The carb::tasking::TaskGroup
object is conceptually an inverse semaphore: It is signaled when empty and will
block when tasks enter it.
“Enter” action: The carb::tasking::TaskGroup::enter()
function is called, incrementing the number of tasks by one.
“Exit” action: The carb::tasking::TaskGroup::leave()
function is called, decrementing the number of tasks by one. If the number of tasks in the task group becomes zero, the TaskGroup
becomes signaled.
TaskGroup
can be passed either via reference or pointer. If a null pointer is passed it will be ignored, which makes
an effective means of supporting optional TaskGroup
objects.
Warning
It is the caller’s responsibility to ensure that the TaskGroup
object is valid until carb::tasking::TaskGroup::try_wait()
would return true
.
TaskContext Tracker¶
Either a pointer or a reference to a carb::tasking::TaskContext
can be passed as a Tracker. If the pointer is
nullptr
it will be ignored.
“Enter” action: The pointer (if non-nullptr
) or reference will receive the associated TaskContext
for the task.
“Exit” action: None.
Future<void> Tracker¶
Regardless of the return value of the functionish object passed to addTask
, a pointer or a reference to a carb::tasking::Future<void>
can be populated as a Tracker. The Future<void>
is a specialization of carb::tasking::Future
that is simple wrapper around a TaskContext
.
Like other Tracker types, a nullptr
will be ignored to facilitate optional types.
“Enter” action: The pointer (if non-nullptr
) or reference will receive the associated Future<void>
for the task.
“Exit” action: None.
Counter Tracker¶
Despite being deprecated and to promote backwards-compatibility, objects convertible to pointers of carb::tasking::Counter
are accepted as Tracker objects. Similarly to other Tracker types, nullptr
is ignored.
“Enter” action: Calls carb::tasking::ITasking::fetchAddCounter()
.
“Exit” action: Calls carb::tasking::ITasking::fetchSubCounter()
.
Warning
It is the caller’s responsibility to ensure that the Counter
object is valid until carb::tasking::ITasking::try_wait()
on the Counter
would return true
.
“Functionish”¶
The Callable
and optional Args
parameters work together to provide std::invoke-like
execution support. Nearly anything that is callable via std::invoke
is supported.
Adding a task implicitly creates a shared state as for carb::tasking::Promise
. The carb::tasking::Future
from the Promise
is returned to the caller, and the return value of Callable
is passed similarly to carb::tasking::Promise::set_value()
.
Therefore, when the Callable
returns a value, the Future
becomes ready and the return value can be retrieved.
If any Args
are provided, they are captured in a std::tuple
using std::decay
and passed as distinct arguments to Callable
. The
Args
are forwarded using std::forward
to be captured and passed to Callable
with std::move
.
If a task is canceled or throws an uncaught exception, the captured Args
will be destroyed.
Lambda¶
One of the most common methods involves lambdas which can capture any arguments. Lambdas are powerful but can make debugging difficult as they are often inlined and do not have a human readable name. Part of the power of lambdas is that asynchronous code is easily visible next to synchronous code.
Warning
If parameters are captured by reference (&
) it is the caller’s responsibility to ensure that the captured item exists for the duration of the task.
auto future = tasking->addTask(Priority::eDefault, {}, [my, captured, &variables] {
// Do asynchronous work
// ...
return result;
});
// Do synchronous work
// ...
std::cout << future.get(); // await result and print
Variables that are not passed in a lambda can be passed as arguments. This is especially helpful for loops.
TaskGroup tg;
for (int i = 0; i != kMaxTasks; ++i)
{
tasking->addTask(Priority::eDefault, tg, [] (int taskNumber) {
// Do something asynchronous based on taskNumber
// ...
}, i); // Note the passing of i as an additional parameter
}
tg.wait();
Function and Pointer-to-Function¶
C-style pointers-to-functions can be passed to addTask
as well:
void fooAsync()
{
// Do something async
// ...
}
tasking->addTask(Priority::eDefault, {}, fooAsync);
Additional parameters passed to addTask
will be passed as parameters to the function:
void fooAsync(std::unique_ptr<int> i)
{
// Do something async with i
// ...
}
tasking->addTask(Priority::eDefault, {}, fooAsync, std::make_unique(5));
Member Functions¶
It is also possible to pass member functions to addTask
and pass the this
parameter as the first additional parameter:
class Foo
{
void myMemberFunc()
{
printf("Foo ptr: %p\n", this);
}
};
Foo fooOnStack;
auto future = tasking->addTask(Priority::eDefault, {}, &Foo::myMemberFunc, &fooOnStack);
future.wait();
Functors¶
Objects with operator()
are also supported:
struct FiveAdder
{
int operator() (int val)
{
return val + 5;
}
};
auto future = tasking->addTask(Priority::eDefault, {}, FiveAdder(), 5);
CHECK_EQ(future.get(), 10);
Execution¶
When addTask
is called, the task goes into a queue within the scheduler. When a worker thread is searching for something
to do, it can retrieve the task and execute it.
Note
A task can begin executing even before addTask
returns!
Assigning a fiber¶
Before a task can begin executing, it must be assigned a special stack called a fiber. A fiber is like a thread that the scheduler can choose to run at any point (as opposed to a thread which the OS will schedule and run). Typically a task is assigned a fiber immediately before it begins executing, but certain operations will assign a fiber immediately even if the task has not started.
Fibers are a limited resource within the system. The absolute maximum fibers is specified by carb::tasking::kMaxFibers
and the maximum is configured by carb::tasking::ITasking::changeParameters
. If the system runs out of fibers,
tasks cannot execute until existing tasks complete and relinquish their fibers. For this reason, the system provides some
support for adding sub-tasks that have a dependency rather than requiring code that immediately waits for a resource upon
starting. This is accomplished through the carb::tasking::ITasking::addSubTask()
function.
Invoking the Callable¶
Once a fiber is assigned, the thread will switch to the fiber and invoke the Callable
passed to addTask
. As this
happens within the context of the fiber, this is known as “task context.” By having an underlying fiber, calls back into
the scheduler to wait won’t block the thread, but will “put the fiber to sleep” and run a different task (or wait for a new
task) until the fiber is woken again. This allows tasks to call “await”-style functions without blocking worker threads.
See Best Practices for more info.
Notifying Trackers¶
When the Callable
returns, any carb::tasking::Future
returned from addTask
will receive the return
value from the Callable
and further attempts to wait()
on the Future
will return immediately.
Note that multiple Tracker
objects are notified in a non-deterministic order and gaps of time exist where one Tracker
may have been notified that the task completed, but the Future
or another Tracker
may not yet have received notification.
Therefore, the following example is bad practice:
auto tg = std::make_unique<TaskGroup>();
auto fut = tasking->addTask(Priority::eDefault, tg, [] { /* do something async */ });
fut.wait();
tg.reset(); // TaskGroup was destroyed without checking to make sure it was signaled yet! Do tg->wait() first.
Returning Values¶
The addTask
returns a carb::tasking::Future
object that is based on the return type of Callable
.
This value can be retrieved via the carb::tasking::Future::get()
function, which will wait until the
task completes and the value becomes available.
Task Completion and Guarantees¶
There are two primary phases to task completion that can be used to trigger subsequent tasks or can be waited on.
Phase 1: Tracker Exit¶
The Trackers that were passed to addTask
are “exited” once the following are guaranteed to have happened (note that the order is not guaranteed):
The
Callable
function has returned (any value returned has been set in theFuture<T>
, but theFuture<T>
is not yet signaled).Any and all parameters bound to
addTask
have been destructed.The
Callable
along with any and all captured variables have been destructed.Task-specific data has been destroyed.
At this point, Trackers that were tracking the task may be signaled, but there is no guarantee that the carb::tasking::TaskContext
or Future<T>
have been signaled.
Phase 2: Task Completion¶
Once Trackers have been exited, the task is marked as complete. This causes the carb::tasking::TaskContext
and Future<T>
to be signaled, and will release any threads or tasks waiting on TaskContext
or Future<T>
.
Waiting on the TaskContext
or the Future<T>
guarantees that the task is complete in all respects, that any and all
bound or captured variables passed to addTask
have been destructed, and that all Trackers have been exited.
Cancellation¶
In some cases it may be advantageous to cancel a task that hasn’t started yet. This can be attempted through the carb::tasking::ITasking::tryCancelTask
function.
Warning
A task cannot be canceled once the Callable
has been invoked. However, the Callable
could check a flag and return as quickly as possible.
The system guarantees that the Callable
will be invoked exactly once unless tryCancelTask
has been invoked and returns
true
, in which case Callable
will never be called.
Successfully cancelling a task also notifies all Tracker
objects that the task is finished, but Future
objects will
be in a state where carb::tasking::Future::isCancelled()
will return true and attempts to get()
the value will
call std::terminate()
.
When tryCancelTask
returns true
it is also guaranteed that Callable
and any captured Args
originally passed
to addTask
have been destroyed, and the associated TaskContext
is signaled.
Other Task Types¶
The system provides other functions that allow queuing tasks that have other dependencies without needing to allocate a fiber to the tasks. Using these functions can mitigate fiber resource issues.
Throttled Tasks¶
Throttled tasks use a carb::tasking::Semaphore
as a gate to ensure that only a certain number of tasks that
share the Semaphore
are executing at any given time. Tasks that cannot acquire the semaphore can wait generally without
assigning a fiber. Before the task can start, it must acquire the semaphore. When the task has completed or has been canceled
the semaphore will be released.
SemaphoreWrapper sema(1); // Starts with a value of 1, so only 1 task may execute concurrently.
TaskGroup tg;
for (int i = 0; i != 5; ++i)
{
tasking->addThrottledTask(sema, Priority::eDefault, {}, [] { /* do something async */ });
}
tg.wait(); // All tasks execute serially because the semaphore has a value of 1
Sub-tasks¶
Sub-tasks can only execute after a dependency becomes signaled. The carb::tasking::ITasking::addSubTask()
function
uses the carb::tasking::RequiredObject
helper function to declare zero or more dependencies for a task. Generally
the task is not assigned a fiber until all dependencies have become signaled.
The following are valid dependencies and can be given as a RequiredObject
:
nullptr
- always signaled.carb::tasking::TaskContext
- signaled when the associated task completes in all respects.carb::tasking::TaskGroup
- signaled when theTaskGroup
contains zero tasks.carb::tasking::Future
- signaled when the value is available to be read from theFuture
.carb::tasking::SharedFuture
- signaled when the value is available to be read from theSharedFuture
.carb::tasking::Counter*
- becomes signaled when theCounter
is at its target value.
auto dependency = tasking->addTask(Priority::eDefault, {}, [] { /* long running task */ });
auto future = tasking->addSubTask(dependency, Priority::eDefault, {}, [] { /* cleanup task */ });
future.wait();
// The dependent task must complete before the cleanup task is started. At this point both have completed.
The carb::tasking::Future::then()
function is provided as syntactic sugar around ITasking::addSubTask
.
There is a function that also combines the concepts of throttling and sub-tasks: carb::tasking::ITasking::addThrottledSubTask()
.
Timed Tasks¶
Another powerful feature involves running a task after a certain amount of time has elapsed, or at a specific time. There are
two functions provided for this: carb::tasking::ITasking::addTaskIn()
and carb::tasking::addTaskAt()
. In
both cases, the task will typically not be assigned a fiber until the time period has elapsed.
// Add an alarm that happens in an hour
using namespace std::chrono_literals;
tasking->addTaskIn(1h, Priority::eDefault, {}, [] { /* do something in an hour */ });