Contents

Better worker threads with c++23 cooperative thread interruption

In this post, I’m gonna discuss how C++23 helps to write better worker threads and saves you the effort to manually manage thread termination.

What are worker threads?

I’m sure that every one has written a worker thread one time or another and it’s impossible not to find (at least) one in any bigger code base. Usually, worker threads are responsible for performing background tasks, like doing some calculations in an asynchronous manner, performing network downloads, job dispatchers, IPC message brokers etc. Basically, if you see a thread and a while loop inside - you’ve got a worker thread!

Just to have a more concrete example, I’m gonna start with a simple Logger implementation which is written using c++11 features.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <mutex>
#include <string>
#include <thread>
#include <deque>
#include <atomic>
#include <chrono>
#include <iostream>


class Logger {
public:
    Logger() :
        isRunning{true},
        t{std::bind(&Logger::run, this)}
    {
    }

    ~Logger() {
        isRunning = false;
        msgsCv.notify_one();
        if (t.joinable()){
            t.join();
        }
    }

    void log(std::string msg) {
        std::unique_lock<std::mutex> l{msgsMutex};
        msgs.push_back(std::move(msg));
        msgsCv.notify_one();
    }

private:
    std::mutex msgsMutex;
    std::deque<std::string> msgs;
    std::atomic<bool> isRunning;
    std::thread t;
    std::condition_variable msgsCv;

    void run() {
        while (isRunning.load()) {
            std::string msg;
            {
                std::unique_lock<std::mutex> l{msgsMutex};
                msgsCv.wait(l, [this](){ return !isRunning.load() || !msgs.empty(); });
                if (!isRunning.load()) {
                    std::cout << "*** logger terminating" << std::endl;
                    break;
                }

                msg = std::move(msgs.front());
                msgs.pop_front();
            }

            std::cout << "*** log: [" << msg << "]" << std::endl;
        }
    }
};

int main(int argc, const char *argv[])
{
    Logger l;

    l.log("hello world");

    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 0;
}

Messages can be posted to the logger, and they will be printed asynchronously. The client code does not have to wait for I/O to complete in its thread. The Logger has its own internal thread where it picks up the messages and pushes them to std::cout. Logger’s thread is terminated in the destructor - this is done by setting the isRunning atomic flag to false to effectively, break the while loop once the wait on condition variable is done.

Once the while invariant is no longer satisfied the run function terminates and the t thread can be successfully joined. Pattern used here, where an object interface delegates the work to its internal thread, is called active object.

Of course, this is just an example code and should be considered with a pinch of salt. It has a lot of shortcomings and it misses a ton of features to be a production ready logger; one feature which would definitely be useful here is draining the message queue on destruction, but I’m not gonna focus on that.

Simple stuff really. This is all fine, but now with c++23, it can be done a bit better.

Cooperative thread interruption

c++23 introduces std::stop_source and std::stop_token. These interfaces have been created specifically to orchestrate thread termination and along with std::jthread, greatly simplify writing worker threads.

Introduction

We’ve got two main interfaces:

  • std::stop_source
  • std::stop_token

The relation between the two can be loosely presented the following way:

/images/cti.png

The two form a pair (similarly as std::promise and std::future). Simply speaking, you can think of the std::stop_source as the write side, and std::stop_token as the read side. A worker thread obtains a std::stop_token from an associated std::stop_source and checks periodically if stop has been requested. std::stop_source is used to signal the stop request.

Introducing std::stop_source and std::stop_token

With that in mind, the Logger can be simplified. Here are the changes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 class Logger {
 public:
-  Logger() : isRunning{true}, t{std::bind(&Logger::run, this)} {}
+  Logger() : t{std::bind(&Logger::run, this, stopSrc.get_token())} {}

   ~Logger() {
-    isRunning = false;
+    stopSrc.request_stop();
     msgsCv.notify_one();
     if (t.joinable()) {
       t.join();
@@ -29,17 +29,19 @@ public:
 private:
   std::mutex msgsMutex;
   std::deque<std::string> msgs;
-  std::atomic<bool> isRunning;
+  std::stop_source stopSrc;
   std::thread t;
   std::condition_variable msgsCv;

-  void run() {
-    while (isRunning.load()) {
+  void run(std::stop_token token) {
+    while (!token.stop_requested()) {
       std::string msg;
       {
         std::unique_lock<std::mutex> l{msgsMutex};
-        msgsCv.wait(l, [this]() { return !isRunning.load() || !msgs.empty(); });
-        if (!isRunning.load()) {
+        msgsCv.wait(l, [this, token]() {
+          return token.stop_requested() || !msgs.empty();
+        });
+        if (token.stop_requested()) {
           std::cout << "*** logger terminating" << std::endl;
           break;
         }

With these changes in place, the atomic variable has been replaced with a std::stop_source. The run function now receives std::stop_token as its first argument. The logic remains the same, stop is requested on the std::stop_source in the destructor, similarly as it has been done with the isRunning variable and within the run function, instead of checking the isRunning variable, now a check is performed on the provided stop token.

Introducing std::condition_variable_any

But there’s more, there’s std::condition_variable_any which is a generalisation of condition_variable. Its wait function accepts std::stop_token that allows for terminating the wait call. The code can be simplified even further:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
   std::deque<std::string> msgs;
   std::stop_source stopSrc;
   std::thread t;
-  std::condition_variable msgsCv;
+  std::condition_variable_any msgsCv;

   void run(std::stop_token token) {
     while (!token.stop_requested()) {
       std::string msg;
       {
         std::unique_lock<std::mutex> l{msgsMutex};
-        msgsCv.wait(l, [this, token]() {
-          return token.stop_requested() || !msgs.empty();
-        });
-        if (token.stop_requested()) {
+        if (!msgsCv.wait(l, token, [this]() { return !msgs.empty(); })) {
           std::cout << "*** logger terminating" << std::endl;
           break;
         }

The wait function returns the result of the stop_waiting predicate so, it’s possible to determine if the wait call was interrupted by the predicate or the std::stop_token.

Introducing std::jthread

std::jthread is a joinable thread. It automatically performs join on destruction and supersedes the std::thread usage. But that’s not all! It integrates an instance of std::stop_source as well! With std::jthread the logger code can be simplified even further. Here are the changes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
+using std::placeholders::_1;
+
 class Logger {
 public:
-  Logger() : t{std::bind(&Logger::run, this, stopSrc.get_token())} {}
-
-  ~Logger() {
-    stopSrc.request_stop();
-    msgsCv.notify_one();
-    if (t.joinable()) {
-      t.join();
-    }
-  }
+  Logger() : t{std::bind(&Logger::run, this, _1)} {}

   void log(std::string msg) {
     std::unique_lock<std::mutex> l{msgsMutex};
@@ -29,9 +23,8 @@ public:
 private:
   std::mutex msgsMutex;
   std::deque<std::string> msgs;
-  std::stop_source stopSrc;
-  std::thread t;
   std::condition_variable_any msgsCv;
+  std::jthread t;

   void run(std::stop_token token) {
     while (!token.stop_requested()) {

Thanks to std::jthread, the explicit synchronisation in the destructor can be completely removed. In fact, the whole destructor becomes redundant so, I removed it. std::jthread accepts two function overloads one which accepts the std::stop_token or another which doesn’t. I’m using the former one. This allows me to use the std::stop_source integrated into std::jthread.

Introducing std::stop_callback

There’s one more handy feature that c++23 provides. std::stop_callback allows for registration of a callback that’s going to be executed when the stop has been requested on associated std::stop_source. Currently, I’m printing a message when the wait on condition variable has been interrupted using a stop token, but std::stop_callback can be used to provide the same functionality.

Here’s the change:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   std::jthread t;

   void run(std::stop_token token) {
+    std::stop_callback callback(
+        token, [] { std::cout << "*** logger stop requested" << std::endl; });
+
     while (!token.stop_requested()) {
       std::string msg;
       {
         std::unique_lock<std::mutex> l{msgsMutex};
         if (!msgsCv.wait(l, token, [this]() { return !msgs.empty(); })) {
-          std::cout << "*** logger terminating" << std::endl;
           break;
         }

Bear in mind though, that previously, the “logger terminating” message printing has been performed on the t thread, where as the callback registered with std::stop_callback executes on a thread which issues stop request. This may be an important detail.

Refactored code

Here’s the refactored code in its entirety using new c++23 features:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <mutex>
#include <stop_token>
#include <string>
#include <thread>

using std::placeholders::_1;

class Logger {
public:
  Logger() : t{std::bind(&Logger::run, this, _1)} {}

  void log(std::string msg) {
    std::unique_lock<std::mutex> l{msgsMutex};
    msgs.push_back(std::move(msg));
    msgsCv.notify_one();
  }

private:
  std::mutex msgsMutex;
  std::deque<std::string> msgs;
  std::condition_variable_any msgsCv;
  std::jthread t;

  void run(std::stop_token token) {
    std::stop_callback callback(
        token, [] { std::cout << "*** logger stop requested" << std::endl; });

    while (!token.stop_requested()) {
      std::string msg;
      {
        std::unique_lock<std::mutex> l{msgsMutex};
        if (!msgsCv.wait(l, token, [this]() { return !msgs.empty(); })) {
          break;
        }

        msg = std::move(msgs.front());
        msgs.pop_front();
      }

      std::cout << "*** log: [" << msg << "]" << std::endl;
    }
  }
};

int main(int argc, const char *argv[]) {
  Logger l;

  l.log("hello world");

  std::this_thread::sleep_for(std::chrono::seconds(1));
  return 0;
}

Additionally, you can find it on gitlab.