ZMQ event handling with zmq_poll
zmq_poll
is used to monitor multiple sockets for events. The API is very
simple. zmq_poll
takes a pointer to an array of zmq_pollitem_t
structures, the number of items
in the array, and a timeout value in milliseconds. Each zmq_pollitem_t
in
the array acts both as an input and an output. The input part is used to
specify the socket to monitor and the events to monitor for. The output part
is used to report the events that occurred on the socket. Here’s how you’d typically use zmq_poll
:
|
|
Simple stuff. zmq_poll waits for events on the sockets specified in the
items
vector and returns the number of events that occurred. If an event
occurred, the revents
field of the zmq_pollitem_t
structure will be set to
the event that occurred. In the example above, I’m only interested in
ZMQ_POLLIN
events, which indicates that there’s data to read from the socket,
but that’s not important. The real question is, how do I safely add and remove
sockets or file descriptors to/from the items
vector?
Assumptions
Before we proceed, let’s make a few assumptions:
- an API like
watchSocket
andunwatchSocket
is needed watchSocket
,unwatchSocket
andwait
itself can be called from any threadwait
is called a lot more frequently thanwatchSocket
orunwatchSocket
The problem is “read heavy”. Therefore, optimising for reads is a priority.
The naive approach
The simplest possible way to implement the required functionality is to copy
the data before calling zmq_poll
.
|
|
This will work but it’s not very efficient as it involves copying the entire
items
vector every time wait
is called.
Limitations
Having this simple example as a baseline allows to identify some constraints we’ll have to be working with:
- data in
items
cannot be modified whilezmq_poll
is running - appending to
items
might cause reallocation, invalidating the entire collection - removing from
items
invalidates all iterators past the removed element - a container providing access to contiguous memory is needed
- there’s one consumer and potentially multiple producers
First attempt
As a first attempt, I thought about trying something similar to an RCU. My approach is a lot more rudimentary and can’t really be called RCU, since it won’t be lock free but it’s a step in the right direction.
The idea is to refer to the container via a shared pointer instead of directly. To do that conveniently, I’m going to implement a simple wrapper:
|
|
Wrapped object can be accessed via checkout
method, which returns a shared
pointer. The reader holds the pointer for as long as it needs to. The writer
can propagate changes by checking out the object, making a copy of it,
modifying the copy, and then calling synchronise
. synchronise
will replace
the pointer to the object with the new one. The old object will be deleted when
the last reader holding a reference to it releases it (if there’s any) or in
synchronise
method, on the writer’s thread (if there are no readers using the
object at all).
Let’s see how this can be used in practice.
|
|
The wait
itself didn’t really change much; instead of operating on items
directly, it’s done via a checkout
which returns a shared_ptr
. As a
result, wait
can use the data without blocking the writers and preventing
them from appending/removing items from the list.
The watchSocket
and unwatchSocket
are more interesting. They both start by
checking out the current state of the items
vector. In watch
, if the
vector has enough capacity left to store the new item, it can be appended
directly without making a copy. This is safe since it’s guaranteed that
reallocation won’t happen and zmq_poll
won’t be affected by the change. If
there’s not enough capacity, a copy of the vector is made, the new item is
appended to the copy, and the pointer is submitted to synchronise
, where it
replaces the old one. Any readers holding a pointer to the old version aren’t
affected, all new readers can checkout only the new, updated version.
Going lock-free or not?
The current implementation is already a big improvement over the naive one. It
allows for concurrent modifications of the zmq_pollitem_t
vector and, most
importantly, wait
is no longer making any copy when polling for events. It
would be nice though, to make the whole thing operate without relying on locks.
Currently, checkout
is performed using a shared_lock
. shared_mutex
is
really an abstraction in C++ allowing for usage of RW locks. If you’re familiar
with pthread_rwlock
, it’s basically the same thing. If not, the short recap
is:
- multiple readers can hold a shared lock at the same time
- only one writer can hold an exclusive lock
- a writer can’t acquire an exclusive lock if there are any shared locks held
- a writer can’t acquire an exclusive lock if there’s another writer holding it
- a reader can’t acquire a shared lock if there’s a writer holding an exclusive lock
This allows for efficient locking of critical sections for read heavy workloads.
But I’m digressing. Can SynchronisedObj
be made entirely lock-free?
Turns out it can. I’ve taken inspiration from a great talk by Timur
Doumler about lock-free
programming in C++ in audio projects. Somewhere around 20th minute, Timur
presents a data structure which he later calls and atomic_unique_ptr
and this
is basically an efficient, lock-free solution doing exactly what I need (or so I thought).
AtomicUniquePtr
Below is my implementation of AtomicUniquePtr
.
|
|
As Timur explains in his talk, the Checkout
class is just a RAII wrapper that
atomically exchanges the raw pointer with nullptr
in the constructor to let
other threads know that the object is in use. In the destructor, it exchanges
nullptr
with the original pointer value back again so, any waiting threads can
proceed and update the object. So, in summary, the atomic raw pointer is used
for synchronisation and the unique_ptr
is used for ownership.
This allows to go entirely lock-free. ZmqPoller
can use AtomicUniquePtr
almost exactly the same way as SynchronisedObj
:
|
|
The only important things that changed is the usage of unique_ptr instead of shared_ptr.
There’s one big important caveat to this approach that unfortunately renders it unusable for this application.
wait
can block for a long time - and that can completelly block any attempts to
update the items
vector.It’s still a great solution for many other use cases and I thought it’s worth discussing it here.
Conclusion
For now, I’m sticking with the SynchronisedObj
as it seems best suited for the
task at hand. The other approach might involve using a production ready RCU
implementation like e.g. liburcu, but that’s a topic
for another time.