Creating our source, part I: our types
In the last chapter we worked out a list of the event sources we need to compose into a new type:
calloop::generic::Generic
calloop::channel::Channel
calloop::ping::Ping
So at a minimum, our type needs to contain these:
pub struct ZeroMQSource
{
// Calloop components.
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
mpsc_receiver: calloop::channel::Channel<?>,
wake_ping_receiver: calloop::ping::PingSource,
}
Note that I've left the type for the channel as ?
— we'll get to that a bit later.
What else do we need? If the PingSource
is there to wake up the loop manually, we need to keep the other end of it. The ping is an internal detail — users of our type don't need to know it's there. We also need the zsocket itself, so we can actually detect and process events on it. That gives us:
pub struct ZeroMQSource
{
// Calloop components.
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
mpsc_receiver: calloop::channel::Channel<?>,
wake_ping_receiver: calloop::ping::PingSource,
/// Sending end of the ping source.
wake_ping_sender: calloop::ping::Ping,
}
The message type
The most obvious candidate for the type of the message queue would be zmq::Message
. But ZeroMQ sockets are capable of sending multipart messages, and this is even mandatory for eg. the PUB
zsocket type, where the first part of the message is the topic.
Therefore it makes more sense to accept a sequence of messages to cover the most general case, and that sequence can have a length of one for single-part messages. But with one more tweak: we can accept a sequence of things that can be transformed into zmq::Message
values. The exact type we'll use will be a generic type like so:
pub struct ZeroMQSource<T>
where
T: IntoIterator,
T::Item: Into<zmq::Message>,
{
mpsc_receiver: calloop::channel::Channel<T>,
// ...
}
Enforcing single messages
Remember that it's not just
Vec<T>
and other sequence types that implementIntoIterator
—Option<T>
implements it too! There is alsostd::iter::Once<T>
. So if a user of our API wants to enforce that all "multi"-part messages actually contain exactly one part, they can use this API withT
being, say,std::iter::Once<zmq::Message>
(or even just[zmq::Message; 1]
in Rust 2021 edition).
Associated types
The EventSource
trait has four associated types:
-
Event
- when an event is generated that our caller cares about (ie. not some internal thing), this is the data we provide to their callback. This will be another sequence of messages, but because we're constructing it we can be more opinionated about the type and use the return type ofzmq::Socket::recv_multipart()
which isVec<Vec<u8>>
. -
Metadata
- this is a more persistent kind of data, perhaps the underlying file descriptor or socket, or maybe some stateful object that the callback can manipulate. It is passed by exclusive reference to theMetadata
type. In our case we don't use this, so it's()
. -
Ret
- this is the return type of the callback that's called on an event. Usually this will be aResult
of some sort; in our case it'sstd::io::Result<()>
just to signal whether some underlying operation failed or not. -
Error
- this is the error type returned byprocess_events()
(not the user callback!). Having this as an associated type allows you to have more control over error propagation in nested event sources. We will use Anyhow, which is like a more fully-featuresBox<dyn Error>
. It allows you to add context to any other error with acontext()
method.
So together these are:
impl<T> calloop::EventSource for ZeroMQSource<T>
where
T: IntoIterator,
T::Item: Into<zmq::Message>,
{
type Event = Vec<Vec<u8>>;
type Metadata = ();
type Ret = io::Result<()>;
type Error = anyhow::Error;
// ...
}
I have saved one surprise for later to emphasise some important principles, but for now, let's move on to defining some methods!