Better worker threads with c++23 cooperative thread interruption
In this post, I’m gonna discuss how C++23 helps to write better worker threads and saves you the effort to manually manage thread termination.
What are worker threads?
I’m sure that every one has written a worker thread one time or another and
it’s impossible not to find (at least) one in any bigger code base. Usually, worker
threads are responsible for performing background tasks, like doing some calculations
in an asynchronous manner, performing network downloads, job dispatchers, IPC
message brokers etc. Basically, if you see a thread and a while
loop inside - you’ve got a worker thread!
Just to have a more concrete example, I’m gonna start with a simple Logger
implementation which is written using c++11 features.
|
|
Messages can be posted to the logger, and they will be printed asynchronously.
The client code does not have to wait for I/O to complete in its thread. The
Logger
has its own internal thread where it picks up the messages and pushes
them to std::cout
. Logger’s thread
is terminated in the destructor - this is
done by setting the isRunning
atomic flag to false
to effectively, break
the while
loop once the wait on condition variable is done.
Once the while
invariant is no longer satisfied the run
function terminates
and the t
thread can be successfully joined. Pattern used here, where an object interface delegates the work to its internal thread, is called active object.
Of course, this is just an example code and should be considered with a pinch of salt. It has a lot of shortcomings and it misses a ton of features to be a production ready logger; one feature which would definitely be useful here is draining the message queue on destruction, but I’m not gonna focus on that.
Simple stuff really. This is all fine, but now with c++23, it can be done a bit better.
Cooperative thread interruption
c++23 introduces std::stop_source
and std::stop_token
. These interfaces
have been created specifically to orchestrate thread termination and along with std::jthread
, greatly simplify writing worker threads.
Introduction
We’ve got two main interfaces:
std::stop_source
std::stop_token
The relation between the two can be loosely presented the following way:
The two form a pair (similarly as std::promise
and std::future
). Simply
speaking, you can think of the std::stop_source
as the write side, and
std::stop_token
as the read side. A worker thread obtains a
std::stop_token
from an associated std::stop_source
and checks periodically
if stop has been requested. std::stop_source
is used to signal the stop request.
Introducing std::stop_source
and std::stop_token
With that in mind, the Logger
can be simplified. Here are the changes:
|
|
With these changes in place, the atomic variable has been replaced with a
std::stop_source
. The run
function now receives std::stop_token
as its
first argument. The logic remains the same, stop is requested on the std::stop_source
in the destructor, similarly as it has been done with the isRunning
variable and within the run
function, instead of checking the isRunning
variable, now a check is performed on the provided stop token.
Introducing std::condition_variable_any
But there’s more, there’s std::condition_variable_any which
is a generalisation of condition_variable
. Its wait function accepts std::stop_token
that allows for terminating the wait call. The code can
be simplified even further:
|
|
The wait
function returns the result of the stop_waiting
predicate so, it’s possible to determine if the wait
call was interrupted by
the predicate or the std::stop_token
.
Introducing std::jthread
std::jthread is a
joinable thread. It automatically performs join
on destruction and
supersedes the std::thread
usage. But that’s not all! It integrates an instance of
std::stop_source
as well! With std::jthread
the logger code can be
simplified even further. Here are the changes:
|
|
Thanks to std::jthread
, the explicit synchronisation in the destructor
can be completely removed. In fact, the whole destructor becomes
redundant so, I removed it. std::jthread
accepts two function overloads one
which accepts the std::stop_token
or another which doesn’t. I’m using the
former one. This allows me to use the std::stop_source
integrated into
std::jthread
.
Introducing std::stop_callback
There’s one more handy feature that c++23 provides. std::stop_callback
allows for registration of a callback that’s going to be executed when the
stop
has been requested on associated std::stop_source
. Currently, I’m
printing a message when the wait
on condition variable has been interrupted
using a stop token, but std::stop_callback
can be used to provide the same
functionality.
Here’s the change:
|
|
Bear in mind though, that previously, the “logger terminating” message printing
has been performed on the t
thread, where as the callback registered with
std::stop_callback
executes on a thread which issues stop request. This may
be an important detail.
Refactored code
Here’s the refactored code in its entirety using new c++23 features:
|
|
Additionally, you can find it on gitlab.