Contents

ZMQ event handling with zmq_poll

zmq_poll is used to monitor multiple sockets for events. The API is very simple. zmq_poll takes a pointer to an array of zmq_pollitem_t structures, the number of items in the array, and a timeout value in milliseconds. Each zmq_pollitem_t in the array acts both as an input and an output. The input part is used to specify the socket to monitor and the events to monitor for. The output part is used to report the events that occurred on the socket. Here’s how you’d typically use zmq_poll:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class ZmqPoller {
public:
    int wait(std::chrono::milliseconds timeout) {
        int eventsPending = zmq_poll(items.data(), items.size(), timeout.count());

        if (eventsPending == -1) {
            // Handle error
        } else if (eventsPending == 0) {
            // Handle timeout
        } else {
            for (const auto& item : items) {
                if (item.revents & ZMQ_POLLIN) {
                    // Handle incoming message
                    // ...
                }
            }
        }

        return eventsPending;
    }

private:
    std::vector<zmq::pollitem_t> items;
};

Simple stuff. zmq_poll waits for events on the sockets specified in the items vector and returns the number of events that occurred. If an event occurred, the revents field of the zmq_pollitem_t structure will be set to the event that occurred. In the example above, I’m only interested in ZMQ_POLLIN events, which indicates that there’s data to read from the socket, but that’s not important. The real question is, how do I safely add and remove sockets or file descriptors to/from the items vector?

Assumptions

Before we proceed, let’s make a few assumptions:

  • an API like watchSocket and unwatchSocket is needed
  • watchSocket, unwatchSocket and wait itself can be called from any thread
  • wait is called a lot more frequently than watchSocket or unwatchSocket

The problem is “read heavy”. Therefore, optimising for reads is a priority.

The naive approach

The simplest possible way to implement the required functionality is to copy the data before calling zmq_poll.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class ZmqPoller {
public:
  int wait(std::chrono::milliseconds timeout) {
    std::unique_lock<std::mutex> lock(mutex);
    auto itemsCopy = items;
    lock.unlock();

    int eventsPending =
        ::zmq_poll(itemsCopy.data(), itemsCopy.size(), timeout.count());

    if (eventsPending == -1) {
      // Handle error
    } else if (eventsPending == 0) {
      // Handle timeout
    } else {
      for (const auto& item : itemsCopy) {
        if (item.revents & ZMQ_POLLIN) {
          // Handle incoming message
          // ...
        }
      }
    }

    return eventsPending;
  }

  void watchSocket(void* socket) {
    ::zmq_pollitem_t item = {socket, 0, ZMQ_POLLIN, 0};
    std::lock_guard<std::mutex> lock(mutex);
    items.push_back(item);
  }

  void unwatchSocket(void* socket) {
    std::lock_guard<std::mutex> lock(mutex);
    std::erase_if(
        items, [&socket](const auto& item) { return item.socket == socket; });
  }

private:
  std::mutex mutex;
  std::vector<::zmq_pollitem_t> items;
};

This will work but it’s not very efficient as it involves copying the entire items vector every time wait is called.

Limitations

Having this simple example as a baseline allows to identify some constraints we’ll have to be working with:

  • data in items cannot be modified while zmq_poll is running
  • appending to items might cause reallocation, invalidating the entire collection
  • removing from items invalidates all iterators past the removed element
  • a container providing access to contiguous memory is needed
  • there’s one consumer and potentially multiple producers

First attempt

As a first attempt, I thought about trying something similar to an RCU. My approach is a lot more rudimentary and can’t really be called RCU, since it won’t be lock free but it’s a step in the right direction.

The idea is to refer to the container via a shared pointer instead of directly. To do that conveniently, I’m going to implement a simple wrapper:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
template <typename T>
class SynchronisedObj {
public:
  explicit SynchronisedObj(std::shared_ptr<T> obj)
      : obj_{std::move(obj)} {}

  void synchronise(std::shared_ptr<T> newValue) {
    std::unique_lock<std::shared_mutex> l{mutex_};
    obj_ = std::move(newValue);
  }

  std::shared_ptr<T> checkout() const {
    std::shared_lock<std::shared_mutex> l{mutex_};
    return obj_;
  }

private:
  mutable std::shared_mutex mutex_;
  std::shared_ptr<T> obj_;
};

Wrapped object can be accessed via checkout method, which returns a shared pointer. The reader holds the pointer for as long as it needs to. The writer can propagate changes by checking out the object, making a copy of it, modifying the copy, and then calling synchronise. synchronise will replace the pointer to the object with the new one. The old object will be deleted when the last reader holding a reference to it releases it (if there’s any) or in synchronise method, on the writer’s thread (if there are no readers using the object at all).

Let’s see how this can be used in practice.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ZmqPoller {
public:
  ZmqPoller() :
    items{std::make_shared<std::vector<::zmq_pollitem_t>>()} {}

  int wait(std::chrono::milliseconds timeout) {
    auto co = items.checkout();

    int eventsPending =
        ::zmq_poll(co->data(), co->size(), timeout.count());

    if (eventsPending == -1) {
      // Handle error
    } else if (eventsPending == 0) {
      // Handle timeout
    } else {
      for (const auto& item : *co) {
        if (item.revents & ZMQ_POLLIN) {
          // Handle incoming message
          // ...
        }
      }
    }

    return eventsPending;
  }

  void watchSocket(void* socket) {
    auto co = items.checkout();
    if (co->size() < co->capacity()) {
        // It's safe to modify the vector without making 
        // a copy since there's enough space to append one 
        // more element without triggering a reallocation.
        co->emplace_back(socket, 0, ZMQ_POLLIN, 0);
    } else {
        // copy items
        auto coCopy = std::make_shared<std::vector<::zmq_pollitem_t>>(
            co->begin(), co->end());
        coCopy->emplace_back(socket, 0, ZMQ_POLLIN, 0);
        items.synchronise(coCopy);      
    }
  }

  void unwatchSocket(void* socket) {
    auto co = items.checkout();
    // copy items
    auto coCopy = std::make_shared<std::vector<::zmq_pollitem_t>>(
        co->begin(), co->end());        
    // erase the socket from the copy
    std::erase_if(
        *coCopy, [&socket](const auto& item) { return item.socket == socket; });
    // replace the copy
    items.synchronise(coCopy);
  }

private:
  SynchronisedObj<std::vector<::zmq_pollitem_t>> items;
};

The wait itself didn’t really change much; instead of operating on items directly, it’s done via a checkout which returns a shared_ptr. As a result, wait can use the data without blocking the writers and preventing them from appending/removing items from the list.

The watchSocket and unwatchSocket are more interesting. They both start by checking out the current state of the items vector. In watch, if the vector has enough capacity left to store the new item, it can be appended directly without making a copy. This is safe since it’s guaranteed that reallocation won’t happen and zmq_poll won’t be affected by the change. If there’s not enough capacity, a copy of the vector is made, the new item is appended to the copy, and the pointer is submitted to synchronise, where it replaces the old one. Any readers holding a pointer to the old version aren’t affected, all new readers can checkout only the new, updated version.

Going lock-free or not?

The current implementation is already a big improvement over the naive one. It allows for concurrent modifications of the zmq_pollitem_t vector and, most importantly, wait is no longer making any copy when polling for events. It would be nice though, to make the whole thing operate without relying on locks.

Currently, checkout is performed using a shared_lock. shared_mutex is really an abstraction in C++ allowing for usage of RW locks. If you’re familiar with pthread_rwlock, it’s basically the same thing. If not, the short recap is:

  • multiple readers can hold a shared lock at the same time
  • only one writer can hold an exclusive lock
  • a writer can’t acquire an exclusive lock if there are any shared locks held
  • a writer can’t acquire an exclusive lock if there’s another writer holding it
  • a reader can’t acquire a shared lock if there’s a writer holding an exclusive lock

This allows for efficient locking of critical sections for read heavy workloads.

But I’m digressing. Can SynchronisedObj be made entirely lock-free?

Turns out it can. I’ve taken inspiration from a great talk by Timur Doumler about lock-free programming in C++ in audio projects. Somewhere around 20th minute, Timur presents a data structure which he later calls and atomic_unique_ptr and this is basically an efficient, lock-free solution doing exactly what I need (or so I thought).

AtomicUniquePtr

Below is my implementation of AtomicUniquePtr.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
template <typename T> class AtomicUniquePtr {
public:
  class Checkout {
  public:
    Checkout(AtomicUniquePtr<T> &parent) : parent_(parent) {
      for (auto expected = parent_.ptr_.load();
           !parent_.ptr_.compare_exchange_weak(expected, nullptr);
           expected = parent_.ptr_.load())
        ;
    }

    ~Checkout() {
      for (T *expected = nullptr;
           !parent_.ptr_.compare_exchange_weak(expected, parent_.owner_.get());)
        ;
    }

    T *operator->() const { return parent_.owner_.get(); }

    T &operator*() const { return *parent_.owner_; }

    T *get() const { return parent_.owner_.get(); }

  private:
    AtomicUniquePtr<T> &parent_;
  };

  AtomicUniquePtr(std::unique_ptr<T> &&ptr)
      : ptr_(ptr.get()), owner_(std::move(ptr)) {}

  Checkout checkout() { return Checkout(*this); }

  void update(std::unique_ptr<T> &&newPtr) {
    for (auto expected = owner_.get();
         !ptr_.compare_exchange_weak(expected, newPtr.get());
         expected = owner_.get())
      ;
    owner_ = std::move(newPtr);
  }

private:
  std::atomic<T *> ptr_;
  std::unique_ptr<T> owner_;
};

As Timur explains in his talk, the Checkout class is just a RAII wrapper that atomically exchanges the raw pointer with nullptr in the constructor to let other threads know that the object is in use. In the destructor, it exchanges nullptr with the original pointer value back again so, any waiting threads can proceed and update the object. So, in summary, the atomic raw pointer is used for synchronisation and the unique_ptr is used for ownership.

This allows to go entirely lock-free. ZmqPoller can use AtomicUniquePtr almost exactly the same way as SynchronisedObj:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

...

  void watchSocket(void *socket) {
    auto co = items.checkout();
    if (co->size() < co->capacity()) {
      // It's safe to modify the vector without making
      // a copy since there's enough space to append one
      // more element without triggering a reallocation.
      co->emplace_back(socket, 0, ZMQ_POLLIN, 0);
    } else {
      // copy items
      auto coCopy = std::make_unique<std::vector<::zmq_pollitem_t>>(co->begin(),
                                                                    co->end());
      coCopy->emplace_back(socket, 0, ZMQ_POLLIN, 0);
      items.update(std::move(coCopy));
    }
  }

  void unwatchSocket(void *socket) {
    auto co = items.checkout();
    // copy items
    auto coCopy =
        std::make_unique<std::vector<::zmq_pollitem_t>>(co->begin(), co->end());
    // erase the socket from the copy
    std::erase_if(
        *coCopy, [&socket](const auto &item) { return item.socket == socket; });
    // replace the copy
    items.update(std::move(coCopy));
  }

...

The only important things that changed is the usage of unique_ptr instead of shared_ptr.

There’s one big important caveat to this approach that unfortunately renders it unusable for this application.

Tip
wait can block for a long time - and that can completelly block any attempts to update the items vector.

It’s still a great solution for many other use cases and I thought it’s worth discussing it here.

Conclusion

For now, I’m sticking with the SynchronisedObj as it seems best suited for the task at hand. The other approach might involve using a production ready RCU implementation like e.g. liburcu, but that’s a topic for another time.