The full ZeroMQ event source code

This is the full source code for a Calloop event source based on a ZeroMQ socket. You might find it useful as a kind of reference. Please read the disclaimer at the start of this chapter if you skipped straight here!

//! A Calloop event source implementation for ZeroMQ sockets.

use std::{collections, io};

use anyhow::Context;

/// A Calloop event source that contains a ZeroMQ socket (of any kind) and a
/// Calloop MPSC channel for sending over it.
///
/// The basic interface is:
/// - create a zmq::Socket for your ZeroMQ socket
/// - use `ZeroMQSource::from_socket()` to turn it into a Calloop event source
///   (plus the sending end of the channel)
/// - queue messages to be sent by sending them on the sending end of the MPSC
///   channel
/// - add the event source to the Calloop event loop with a callback to handle
///   reading
/// - the sending end of the MPSC channel can be cloned and sent across threads
///   if necessary
///
/// This type is parameterised by `T`:
///
///     T where T: IntoIterator, T::Item: Into<zmq::Message>
//
/// This means that `T` is anything that can be converted to an iterator, and
/// the items in the iterator are anything that can be converted to a
/// `zmq::Message`. So eg. a `Vec<String>` would work.
///
/// The callback is called whenever the underlying socket becomes readable. It
/// is called with a vec of byte sequences (`Vec<Vec<u8>>`) and the event loop
/// data set by the user.
///
/// Note about why the read data is a vec of multipart message parts: we don't
/// know what kind of socket this is, or what will be sent, so the most general
/// thing we can do is receive the entirety of a multipart message and call the
/// user callback with the whole set. Usually the number of parts in a multipart
/// message will be one, but the code will work just the same when it's not.
///
/// This event source also allows you to use different event sources to publish
/// messages over the same writeable ZeroMQ socket (usually PUB or PUSH).
/// Messages should be sent over the Calloop MPSC channel sending end. This end
/// can be cloned and used by multiple senders.

pub struct ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    // Calloop components.
    /// Event source for ZeroMQ socket.
    socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,

    /// Event source for channel.
    mpsc_receiver: calloop::channel::Channel<T>,

    /// Because the ZeroMQ socket is edge triggered, we need a way to "wake" the
    /// event source upon (re-)registration. We do this with a separate
    /// `calloop::ping::Ping` source.
    wake_ping_receiver: calloop::ping::PingSource,

    /// Sending end of the ping source.
    wake_ping_sender: calloop::ping::Ping,

    // ZeroMQ socket.
    /// FIFO queue for the messages to be published.
    outbox: collections::VecDeque<T>,
}

impl<T> ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    // Converts a `zmq::Socket` into a `ZeroMQSource` plus the sending end of an
    // MPSC channel to enqueue outgoing messages.
    pub fn from_socket(socket: zmq::Socket) -> io::Result<(Self, calloop::channel::Sender<T>)> {
        let (mpsc_sender, mpsc_receiver) = calloop::channel::channel();
        let (wake_ping_sender, wake_ping_receiver) = calloop::ping::make_ping()?;

        let socket = calloop::generic::Generic::new(
            unsafe { calloop::generic::FdWrapper::new(socket) },
            calloop::Interest::READ,
            calloop::Mode::Edge,
        );

        Ok((
            Self {
                socket,
                mpsc_receiver,
                wake_ping_receiver,
                wake_ping_sender,
                outbox: collections::VecDeque::new(),
            },
            mpsc_sender,
        ))
    }
}

/// This event source runs for three events:
///
/// 1. The event source was registered. It is forced to run so that any pending
///    events on the socket are processed.
///
/// 2. A message was sent over the MPSC channel. In this case we put it in the
///    internal queue.
///
/// 3. The ZeroMQ socket is readable. For this, we read off a complete multipart
///    message and call the user callback with it.
///
/// The callback provided to `process_events()` may be called multiple times
/// within a single call to `process_events()`.
impl<T> calloop::EventSource for ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    type Event = Vec<Vec<u8>>;
    type Metadata = ();
    type Ret = io::Result<()>;
    type Error = anyhow::Error;

    fn process_events<F>(
        &mut self,
        readiness: calloop::Readiness,
        token: calloop::Token,
        mut callback: F,
    ) -> Result<calloop::PostAction, Self::Error>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        // Runs if we were woken up on startup/registration.
        self.wake_ping_receiver
            .process_events(readiness, token, |_, _| {})
            .context("Failed after registration")?;

        // Runs if we were woken up because a message was sent on the channel.
        let outbox = &mut self.outbox;

        self.mpsc_receiver
            .process_events(readiness, token, |evt, _| {
                if let calloop::channel::Event::Msg(msg) = evt {
                    outbox.push_back(msg);
                }
            })
            .context("Failed to processing outgoing messages")?;

        // The ZeroMQ file descriptor is edge triggered. This means that if (a)
        // messages are added to the queue before registration, or (b) the
        // socket became writeable before messages were enqueued, we will need
        // to run the loop below. Hence, it always runs if this event source
        // fires. The process_events() method doesn't do anything though, so we
        // ignore it.

        loop {
            // According to the docs, the edge-triggered FD will not change
            // state if a socket goes directly from being readable to being
            // writeable (or vice-versa) without there being an in-between point
            // where there are no events. This can happen as a result of sending
            // or receiving on the socket while processing such an event. The
            // "used_socket" flag below tracks whether we perform an operation
            // on the socket that warrants reading the events again.
            let events = self
                .socket
                .get_ref()
                .get_events()
                .context("Failed to read ZeroMQ events")?;

            let mut used_socket = false;

            if events.contains(zmq::POLLOUT) {
                if let Some(parts) = self.outbox.pop_front() {
                    self.socket
                        .get_ref()
                        .send_multipart(parts, 0)
                        .context("Failed to send message")?;
                    used_socket = true;
                }
            }

            if events.contains(zmq::POLLIN) {
                // Batch up multipart messages. ZeroMQ guarantees atomic message
                // sending, which includes all parts of a multipart message.
                let messages = self
                    .socket
                    .get_ref()
                    .recv_multipart(0)
                    .context("Failed to receive message")?;
                used_socket = true;

                // Capture and report errors from the callback, but don't propagate
                // them up.
                callback(messages, &mut ()).context("Error in event callback")?;
            }

            if !used_socket {
                break;
            }
        }

        Ok(calloop::PostAction::Continue)
    }

    fn register(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        self.socket.register(poll, token_factory)?;
        self.mpsc_receiver.register(poll, token_factory)?;
        self.wake_ping_receiver.register(poll, token_factory)?;

        self.wake_ping_sender.ping();

        Ok(())
    }

    fn reregister(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        self.socket.reregister(poll, token_factory)?;
        self.mpsc_receiver.reregister(poll, token_factory)?;
        self.wake_ping_receiver.reregister(poll, token_factory)?;

        self.wake_ping_sender.ping();

        Ok(())
    }

    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
        self.socket.unregister(poll)?;
        self.mpsc_receiver.unregister(poll)?;
        self.wake_ping_receiver.unregister(poll)?;
        Ok(())
    }
}

impl<T> Drop for ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    fn drop(&mut self) {
        // This is one way to stop socket code (especially PUSH sockets) hanging
        // at the end of any blocking functions.
        //
        // - https://stackoverflow.com/a/38338578/188535
        // - http://api.zeromq.org/4-0:zmq-ctx-term
        self.socket.get_ref().set_linger(0).ok();
        self.socket.get_ref().set_rcvtimeo(0).ok();
        self.socket.get_ref().set_sndtimeo(0).ok();

        // Double result because (a) possible failure on call and (b) possible
        // failure decoding.
        if let Ok(Ok(last_endpoint)) = self.socket.get_ref().get_last_endpoint() {
            self.socket.get_ref().disconnect(&last_endpoint).ok();
        }
    }
}

pub fn main() {}

Dependencies are:

  • calloop (whatever version this document was built from)
  • zmq 0.9
  • anyhow 1.0
[dependencies]
calloop = { path = '../..' }
zmq = "0.9"
anyhow = "1.0"