The full ZeroMQ event source code
This is the full source code for a Calloop event source based on a ZeroMQ socket. You might find it useful as a kind of reference. Please read the disclaimer at the start of this chapter if you skipped straight here!
//! A Calloop event source implementation for ZeroMQ sockets.
use std::{collections, io};
use anyhow::Context;
/// A Calloop event source that contains a ZeroMQ socket (of any kind) and a
/// Calloop MPSC channel for sending over it.
///
/// The basic interface is:
/// - create a zmq::Socket for your ZeroMQ socket
/// - use `ZeroMQSource::from_socket()` to turn it into a Calloop event source
/// (plus the sending end of the channel)
/// - queue messages to be sent by sending them on the sending end of the MPSC
/// channel
/// - add the event source to the Calloop event loop with a callback to handle
/// reading
/// - the sending end of the MPSC channel can be cloned and sent across threads
/// if necessary
///
/// This type is parameterised by `T`:
///
/// T where T: IntoIterator, T::Item: Into<zmq::Message>
//
/// This means that `T` is anything that can be converted to an iterator, and
/// the items in the iterator are anything that can be converted to a
/// `zmq::Message`. So eg. a `Vec<String>` would work.
///
/// The callback is called whenever the underlying socket becomes readable. It
/// is called with a vec of byte sequences (`Vec<Vec<u8>>`) and the event loop
/// data set by the user.
///
/// Note about why the read data is a vec of multipart message parts: we don't
/// know what kind of socket this is, or what will be sent, so the most general
/// thing we can do is receive the entirety of a multipart message and call the
/// user callback with the whole set. Usually the number of parts in a multipart
/// message will be one, but the code will work just the same when it's not.
///
/// This event source also allows you to use different event sources to publish
/// messages over the same writeable ZeroMQ socket (usually PUB or PUSH).
/// Messages should be sent over the Calloop MPSC channel sending end. This end
/// can be cloned and used by multiple senders.
pub struct ZeroMQSource<T>
where
T: IntoIterator,
T::Item: Into<zmq::Message>,
{
// Calloop components.
/// Event source for ZeroMQ socket.
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
/// Event source for channel.
mpsc_receiver: calloop::channel::Channel<T>,
/// Because the ZeroMQ socket is edge triggered, we need a way to "wake" the
/// event source upon (re-)registration. We do this with a separate
/// `calloop::ping::Ping` source.
wake_ping_receiver: calloop::ping::PingSource,
/// Sending end of the ping source.
wake_ping_sender: calloop::ping::Ping,
// ZeroMQ socket.
/// FIFO queue for the messages to be published.
outbox: collections::VecDeque<T>,
}
impl<T> ZeroMQSource<T>
where
T: IntoIterator,
T::Item: Into<zmq::Message>,
{
// Converts a `zmq::Socket` into a `ZeroMQSource` plus the sending end of an
// MPSC channel to enqueue outgoing messages.
pub fn from_socket(socket: zmq::Socket) -> io::Result<(Self, calloop::channel::Sender<T>)> {
let (mpsc_sender, mpsc_receiver) = calloop::channel::channel();
let (wake_ping_sender, wake_ping_receiver) = calloop::ping::make_ping()?;
let socket = calloop::generic::Generic::new(
unsafe { calloop::generic::FdWrapper::new(socket) },
calloop::Interest::READ,
calloop::Mode::Edge,
);
Ok((
Self {
socket,
mpsc_receiver,
wake_ping_receiver,
wake_ping_sender,
outbox: collections::VecDeque::new(),
},
mpsc_sender,
))
}
}
/// This event source runs for three events:
///
/// 1. The event source was registered. It is forced to run so that any pending
/// events on the socket are processed.
///
/// 2. A message was sent over the MPSC channel. In this case we put it in the
/// internal queue.
///
/// 3. The ZeroMQ socket is readable. For this, we read off a complete multipart
/// message and call the user callback with it.
///
/// The callback provided to `process_events()` may be called multiple times
/// within a single call to `process_events()`.
impl<T> calloop::EventSource for ZeroMQSource<T>
where
T: IntoIterator,
T::Item: Into<zmq::Message>,
{
type Event = Vec<Vec<u8>>;
type Metadata = ();
type Ret = io::Result<()>;
type Error = anyhow::Error;
fn process_events<F>(
&mut self,
readiness: calloop::Readiness,
token: calloop::Token,
mut callback: F,
) -> Result<calloop::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
// Runs if we were woken up on startup/registration.
self.wake_ping_receiver
.process_events(readiness, token, |_, _| {})
.context("Failed after registration")?;
// Runs if we were woken up because a message was sent on the channel.
let outbox = &mut self.outbox;
self.mpsc_receiver
.process_events(readiness, token, |evt, _| {
if let calloop::channel::Event::Msg(msg) = evt {
outbox.push_back(msg);
}
})
.context("Failed to processing outgoing messages")?;
// The ZeroMQ file descriptor is edge triggered. This means that if (a)
// messages are added to the queue before registration, or (b) the
// socket became writeable before messages were enqueued, we will need
// to run the loop below. Hence, it always runs if this event source
// fires. The process_events() method doesn't do anything though, so we
// ignore it.
loop {
// According to the docs, the edge-triggered FD will not change
// state if a socket goes directly from being readable to being
// writeable (or vice-versa) without there being an in-between point
// where there are no events. This can happen as a result of sending
// or receiving on the socket while processing such an event. The
// "used_socket" flag below tracks whether we perform an operation
// on the socket that warrants reading the events again.
let events = self
.socket
.get_ref()
.get_events()
.context("Failed to read ZeroMQ events")?;
let mut used_socket = false;
if events.contains(zmq::POLLOUT) {
if let Some(parts) = self.outbox.pop_front() {
self.socket
.get_ref()
.send_multipart(parts, 0)
.context("Failed to send message")?;
used_socket = true;
}
}
if events.contains(zmq::POLLIN) {
// Batch up multipart messages. ZeroMQ guarantees atomic message
// sending, which includes all parts of a multipart message.
let messages = self
.socket
.get_ref()
.recv_multipart(0)
.context("Failed to receive message")?;
used_socket = true;
// Capture and report errors from the callback, but don't propagate
// them up.
callback(messages, &mut ()).context("Error in event callback")?;
}
if !used_socket {
break;
}
}
Ok(calloop::PostAction::Continue)
}
fn register(
&mut self,
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory,
) -> calloop::Result<()> {
self.socket.register(poll, token_factory)?;
self.mpsc_receiver.register(poll, token_factory)?;
self.wake_ping_receiver.register(poll, token_factory)?;
self.wake_ping_sender.ping();
Ok(())
}
fn reregister(
&mut self,
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory,
) -> calloop::Result<()> {
self.socket.reregister(poll, token_factory)?;
self.mpsc_receiver.reregister(poll, token_factory)?;
self.wake_ping_receiver.reregister(poll, token_factory)?;
self.wake_ping_sender.ping();
Ok(())
}
fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
self.socket.unregister(poll)?;
self.mpsc_receiver.unregister(poll)?;
self.wake_ping_receiver.unregister(poll)?;
Ok(())
}
}
impl<T> Drop for ZeroMQSource<T>
where
T: IntoIterator,
T::Item: Into<zmq::Message>,
{
fn drop(&mut self) {
// This is one way to stop socket code (especially PUSH sockets) hanging
// at the end of any blocking functions.
//
// - https://stackoverflow.com/a/38338578/188535
// - http://api.zeromq.org/4-0:zmq-ctx-term
self.socket.get_ref().set_linger(0).ok();
self.socket.get_ref().set_rcvtimeo(0).ok();
self.socket.get_ref().set_sndtimeo(0).ok();
// Double result because (a) possible failure on call and (b) possible
// failure decoding.
if let Ok(Ok(last_endpoint)) = self.socket.get_ref().get_last_endpoint() {
self.socket.get_ref().disconnect(&last_endpoint).ok();
}
}
}
pub fn main() {}
Dependencies are:
- calloop (whatever version this document was built from)
- zmq 0.9
- anyhow 1.0
[dependencies]
calloop = { path = '../..' }
zmq = "0.9"
anyhow = "1.0"