Calloop's Documentation

API

If you're looking for calloop's API documentation, they are available on docs.rs for the released versions. There are also the docs of the current development version.

Tutorial

This book presents a step-by-step tutorial to get yourself familiar with calloop and how it is used:

  • Chapter 1 presents the general principles of an event loop that are important to have in mind when working with calloop.
  • Chapter 2 goes through the different kind of event sources that are provided in calloop, and provides examples of how to use them
  • Chapter 3 presents the integration with Rust's Async/Await ecosystem provided by calloop
  • Chapter 4 gives a detailed example of building a custom event source in calloop, by combining other sources

How an event loop works

An event loop is one way to write concurrent code. Other ways include threading (sort of), or asynchronous syntax.

When you write concurrent code, you need to know two things:

  • where in your program it could potentially block, and
  • how to let other parts of your program run while waiting for those operations to stop blocking

This chapter covers what the first thing means, and how Calloop accomplishes the second thing.

Terminology

A blocking operation is one that waits for an event to happen, and doesn't do anything else while it's waiting. For example, if you try to read from a network socket, and there is no data available, the read operation could wait for some indefinite amount of time. Your program will be in a state where it does not need to use any CPU cycles, or indeed do anything at all, and it won't proceed until there is data to read.

Examples of blocking operations are:

  • waiting for a certain time to elapse
  • reading or writing to a file
  • reading or writing to a network socket
  • waiting for a thread to finish

When any of these operations are ready to go, we call it an event. We call the underlying things (files, network sockets, timers, etc.) sources for events. So, for example, you can create an event source that corresponds to a file, and it will generate events when it is ready for reading, or writing, or encounters an error.

Events and callbacks

An event loop like Calloop, as the name suggests, runs in a loop. At the start of the loop, Calloop checks all the sources you've added to see if any events have happened for those sources. If they have, Calloop will call a function that you provide (known as a callback).

This function will (possibly) be given some data for the event itself (eg. the bytes received), some state for the event source (eg. the socket, or a type that wraps it in a neater API), and some state for the whole program.

Calloop will do this one by one for each source that has a new event. If a file is ready for reading, your file-event-source callback will be called. If a timer has elapsed, your timer-event-source callback will be called.

It is up to you to write the code to do things when events happen. For example, your callback might read data from a file "ready for reading" event into a queue. When the queue contains a valid message, the same callback could send that message over an internal channel to another event source. That second event source could have its own callback that processes entire messages and updates the program's state. And so on.

Concurrency vs parallelism

This "one by one" nature of event loops is important. When you approach concurrency using threads, operations in any thread can be interleaved with operations in any other thread. This is typically made robust by either passing messages or using shared memory with synchronisation.

Callbacks in an event loop do not run in parallel, they run one after the other. Unless you (or your dependencies) have introduced threading, you can (and should) write your callbacks as single-threaded code.

Event loops vs async code

This single-threaded nature makes event loops much more similar to code that uses async/await than to multithreaded code. There are benefits and tradeoffs to either approach.

Calloop will take care of a lot of integration and error handling boilerplate for you. It also makes it clearer what parts of your code are the non-blocking actions to perform as a result of events. If you like to think of your program in terms of taking action in reaction to events, this can be a great advantage!

However, this comes at the expense of needing to make your program's state much more explicit. For example, take this async code:

do_thing_one().await;
do_thing_two().await;
do_thing_three().await;

The state of the program is simply given by: what line is it up to? You know if it's done "thing one" because execution has proceeded to line two. No other state is required. In Calloop, however, you will need extra variables and code so that when your callback is called, it knows whether to run do_thing_one(), do_thing_two(), or do_thing_three().

Never block the loop!

All of this leads us to the most important rule of event loop code: never block the loop! This means: never use blocking calls inside one of your event callbacks. Do not use synchronous file write() calls in a callback. Do not sleep() in a callback. Do not join() a thread in a callback. Don't you do it!

If you do, the event loop will have no way to proceed, and just... wait for your blocking operation to complete. Nothing is going to run in a parallel thread. Nothing is going to stop your callback and move on to the next one. If your callback needs to wait for a blocking operation, your code must allow it to keep track of where it's up to, return from the callback, and wait for the event like any other.

Calloop and composition

Calloop is designed to work by composition. This means that you build up more complex logic in your program by combining simpler event sources into more complex ones. Want a network socket with custom backoff/timeout logic? Create a type containing a network socket using the Generic file descriptor adapter, a timer, and tie them together with your backoff logic and state. There is a much more detailed example of composition in our ZeroMQ example.

Using event sources

Calloop's structure is entirely built around the EventSource trait, which represents something that is capable of generating events. To receive those events, you need to give ownership of the event source to calloop, along with a closure that will be invoked whenever said source generated an event. This is thus a push-based model, that is most suited for contexts where your program needs to react to (unpredictable) outside events, rather than wait efficiently for the completion of some operation it initiated.

Monitoring a file descriptor

The Generic event source wraps a file descriptor ("fd") and fires its callback any time there are events on it ie. becoming readable or writable, or encountering an error. It's pretty simple, but it's what every other event source is based around. And since the platforms that calloop runs on expose many different kinds of events via fds, it's usually the key to using those events in calloop.

For example on Linux, fd-based interfaces are available for GPIO, I2C, USB, UART, network interfaces, timers and many other systems. Integrating these into calloop starts with obtaining the appropriate fd, creating a Generic event source from it, and building up a more useful, abstracted event source around that. A detailed example of this is given for ZeroMQ.

You do not have to use a low-level fd either: any type that implements AsFd can be provided. This means that you can use a wrapper type that handles allocation and disposal itself, and implement AsRawFd on it so that Generic can manage it in the event loop.

Creating a Generic source

Creating a Generic event source requires three things:

  • An OwnedFd or a wrapper type that implements AsFd
  • The "interest" you have in this descriptor — this means, whether you want to generate events when the fd becomes readable, writeable, or both
  • The "mode" of event generation - level triggered or edge triggered

The easiest constructor to use is the new() method, but if you need control over the associated error type there is also new_with_error().

Ownership and AsFd wrappers

Rust 1.63 introduced a concept of file descriptor ownership and borrowing through the OwnedFd and BorrowedFd types, which are also available on older Rust versions through the io-lifetimes crate. The AsFd trait provides a way to get a BorrowedFd corresponding to a file, socket, etc. while guaranteeing the fd will be valid for the lifetime of the BorrowedFd.

Not all third party crates use AsFd yet, and may instead provide types implementing AsRawFd. 'AsFdWrapper' provides a way to adapt these types. To use this safely, ensure the AsRawFd implementation of the type it wraps returns a valid fd as long as the type exists. And to avoid an fd leak, it should ultimately be closed properly.

Safe types like OwnedFd and BorrowedFd should be preferred over RawFds, and the use of RawFds outside of implementing FFI shouldn't be necessary as libraries move to using the IO safe types and traits.

Working with timers

Timer event sources are used to manipulate time-related actions. Those are provided under the calloop::timer module, with the Timer type at its core.

A Timer source has a simple behavior: it is programmed to wait for some duration, or until a certain point in time. Once that deadline is reached, the source generates an event.

So with use calloop::timer::Timer at the top of our .rs file, we can create a timer that will wait for 5 seconds:

use std::time::Duration;

use calloop::{
    timer::{TimeoutAction, Timer},
    EventLoop,
};

fn main() {
    let mut event_loop = EventLoop::try_new().expect("Failed to initialize the event loop!");

    let timer = Timer::from_duration(Duration::from_secs(5));

    event_loop
        .handle()
        .insert_source(timer, |deadline, _: &mut (), _shared_data| {
            println!("Event fired for: {:?}", deadline);
            TimeoutAction::Drop
        })
        .expect("Failed to insert event source!");

    event_loop
        .dispatch(None, &mut ())
        .expect("Error during event loop!");
}

Adding sources to the loop

We have an event source, we have our shared data, and we know how to start our loop running. All that is left is to learn how to combine these things:

use std::time::Duration;

use calloop::{
    timer::{TimeoutAction, Timer},
    EventLoop,
};

fn main() {
    let mut event_loop = EventLoop::try_new().expect("Failed to initialize the event loop!");

    let timer = Timer::from_duration(Duration::from_secs(5));

    event_loop
        .handle()
        .insert_source(timer, |deadline, _: &mut (), _shared_data| {
            println!("Event fired for: {:?}", deadline);
            TimeoutAction::Drop
        })
        .expect("Failed to insert event source!");

    event_loop
        .dispatch(None, &mut ())
        .expect("Error during event loop!");
}

Breaking this down, the callback we provide receives 3 arguments:

  • The first one is an Instant representing the time at which this timer was scheduled to expire. Due to how the event loop works, it might be that your callback is not invoked at the exact time where the timer expired (if an other callback was being processed at the time for example), so the original deadline is given if you need precise time tracking.
  • The second argument is just &mut (), as the timers don't use the EventSource functionality.
  • The third argumument is the shared data passed to your event loop.

In addition your callback is expected to return a TimeoutAction, that will instruct calloop what to do next. This enum has 3 values:

  • Drop will disable the timer and destroy it, freeing the callback.
  • ToInstant will rechedule the callback to fire again at given Instant, invoking the same callback again. This is useful if you need to create a timer that fires events at regular intervals, for example to encode key repetition in a graphical app. You would compute the next instant by adding the duration to the previous instant. It is not a problem if that duration is in the past, it'll simply cause the timer to fire again instantly. THis way, even if some other part of your app lags, you'll still have on average the correct amount of events per second.
  • ToDuration will reschedule the callback to fire again after a given Duration. This is useful if you need to schedule some background task to execute again after some time after it was last completed, when there is no point in catching up some previous lag.

The whole program

Putting it all together, we have:

use std::time::Duration;

use calloop::{
    timer::{TimeoutAction, Timer},
    EventLoop,
};

fn main() {
    let mut event_loop = EventLoop::try_new().expect("Failed to initialize the event loop!");

    let timer = Timer::from_duration(Duration::from_secs(5));

    event_loop
        .handle()
        .insert_source(timer, |deadline, _: &mut (), _shared_data| {
            println!("Event fired for: {:?}", deadline);
            TimeoutAction::Drop
        })
        .expect("Failed to insert event source!");

    event_loop
        .dispatch(None, &mut ())
        .expect("Error during event loop!");
}

"Pinging" the event loop

The Ping event source has one very simple job — wake up the event loop. Use this when you know there are events for your event source to process, but those events aren't going to wake the event loop up themselves.

For example, calloop's own message channel uses Rust's native MPSC channel internally. Because there's no way for the internal message queue to wake up the event loop, it's coupled with a Ping source that wakes the loop up when there are new messages.

How to use the Ping source

The Ping has two ends — the event source part (PingSource), that goes in the event loop, and the sending end (Ping) you use to "send" the ping. To wake the event loop up, call ping() on the sending end.

Do not forget to process the events of the PingSource if you are using it as part of a larger event source! Even though the events carry no information (they are just () values), the process_events() method must be called in order to "reset" the PingSource. Otherwise the event loop will be continually woken up until you do, effectively becoming a busy-loop.

Channels

Unix Signals

Error handling in Calloop

Overview

Most error handling crates/guides/documentation for Rust focus on one of two situations:

  • Creating errors that an API can propagate out to a user of the API, or
  • Making your library deal nicely with the Results from closure or trait methods that it might call

Calloop has to do both of these things. It needs to provide a library user with errors that work well with ? and common error-handling idioms in their own code, and it needs to handle errors from the callbacks you give to process_events() or insert_source(). It also needs to provide some flexibility in the EventSource trait, which is used both for internal event sources and by users of the library.

Because of this, error handling in Calloop leans more towards having separate error types for different concerns. This may mean that there is some extra conversion code in places like returning results from process_events(), or in callbacks that use other libraries. However, we try to make it smoother to do these conversions, and to make sure information isn't lost in doing so.

If your crate already has some form of structured error handling, Calloop's error types should pose no problem to integrate into this. All of Calloop's errors implement std::error::Error and can be manipulated the same as any other error types.

The place where this becomes the most complex is in the process_events() method on the EventSource trait.

The Error type on the EventSource trait

The EventSource trait contains an associated type named Error, which forms part of the return type from process_events(). This type must be convertible into Box<dyn std::error::Error + Sync + Send>, which means you can use:

  • Your own error type that implements std::error::Error
  • A structured error type created with Thiserror
  • Box<dyn std::error::Error + Sync + Send>
  • A flexible string-based error type such as Anyhow's anyhow::Error

As a rule, if you implement EventSource you should try to split your errors into two different categories:

  • Errors that make sense as a kind of event. These should be a part of the Event associated type eg. as an enum or Result.
  • Errors that mean your event source simply cannot process more events. These should form the Error associated type.

For an example, take Calloop's channel type, calloop::channel::Channel. When the sending end is dropped, no more messages can be received after that point. But this is not returned as an error when calling process_events(), because you still want to (and can!) receive messages sent before that point that might still be in the queue. Hence the events received by the callback for this source can be Msg(e) or Closed.

However, if the internal ping source produces an error, there is no way for the sending end of the channel to notify the receiver. It is impossible to process more events on this event source, and the caller needs to decide how to recover from this situation. Hence this is returned as a ChannelError from process_events().

Another example might be an event source that represents a running subprocess. If the subprocess exits with a non-zero status code, or the executable can't be found, those don't mean that events can no longer be processed. They can be provided to the caller through the callback. But if the lower level sources being used to run (eg. an asynchronous executor or subprocess file descriptor) fail to work as expected, process_events() should return an error.

Async/await in calloop

While it is centered on event sources and callbacks, calloop also provides adapters to integrate with Rust's async ecosystem. These adapters come in two parts: a futures executor and an async adapter for IO type.

Run async code

Enable the executor feature!

To use calloop::futures you need to enable the executor feature in your Cargo.toml like so:

[dependencies.calloop]
features = [ "executor" ]
version = ...

Let's say you have some async code that looks like this:

sender.send("Hello,").await.ok();
receiver.next().await.map(|m| println!("Received: {}", m));
sender.send("world!").await.ok();
receiver.next().await.map(|m| println!("Received: {}", m));
"So long!"

...and a corresponding block that receives and sends to this one. I will call one of these blocks "friendly" and the other one "aloof".

To run async code in Calloop, you use the components in calloop::futures. First, obtain both an executor and a scheduler with calloop::futures::executor():

use calloop::EventLoop;

// futures = "0.3"
use futures::sink::SinkExt;
use futures::stream::StreamExt;

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    // Let's create two channels for our async blocks below. The blocks will
    // exchange messages via these channels.
    let (mut sender_friendly, mut receiver_friendly) = futures::channel::mpsc::unbounded();
    let (mut sender_aloof, mut receiver_aloof) = futures::channel::mpsc::unbounded();

    // Our toy async code.
    let async_friendly_task = async move {
        sender_friendly.send("Hello,").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        sender_friendly.send("world!").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        "Bye!"
    };

    let async_aloof_task = async move {
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("Oh,").await.ok();
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("it's you.").await.ok();
        "Regards."
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_friendly_task).unwrap();
    sched.schedule(async_aloof_task).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut (), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

The executor, the part that executes the future, goes in the event loop:

use calloop::EventLoop;

// futures = "0.3"
use futures::sink::SinkExt;
use futures::stream::StreamExt;

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    // Let's create two channels for our async blocks below. The blocks will
    // exchange messages via these channels.
    let (mut sender_friendly, mut receiver_friendly) = futures::channel::mpsc::unbounded();
    let (mut sender_aloof, mut receiver_aloof) = futures::channel::mpsc::unbounded();

    // Our toy async code.
    let async_friendly_task = async move {
        sender_friendly.send("Hello,").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        sender_friendly.send("world!").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        "Bye!"
    };

    let async_aloof_task = async move {
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("Oh,").await.ok();
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("it's you.").await.ok();
        "Regards."
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_friendly_task).unwrap();
    sched.schedule(async_aloof_task).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut (), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

Now let's write our async code in full:

use calloop::EventLoop;

// futures = "0.3"
use futures::sink::SinkExt;
use futures::stream::StreamExt;

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    // Let's create two channels for our async blocks below. The blocks will
    // exchange messages via these channels.
    let (mut sender_friendly, mut receiver_friendly) = futures::channel::mpsc::unbounded();
    let (mut sender_aloof, mut receiver_aloof) = futures::channel::mpsc::unbounded();

    // Our toy async code.
    let async_friendly_task = async move {
        sender_friendly.send("Hello,").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        sender_friendly.send("world!").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        "Bye!"
    };

    let async_aloof_task = async move {
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("Oh,").await.ok();
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("it's you.").await.ok();
        "Regards."
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_friendly_task).unwrap();
    sched.schedule(async_aloof_task).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut (), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

Like any block in Rust, the value of your async block is the last expression ie. it is effectively "returned" from the block, which means it will be provided to your executor's callback as the first argument (the "event"). You'll see this in the output with the Async block ended with: ... lines.

Finally, we run the loop:

use calloop::EventLoop;

// futures = "0.3"
use futures::sink::SinkExt;
use futures::stream::StreamExt;

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    // Let's create two channels for our async blocks below. The blocks will
    // exchange messages via these channels.
    let (mut sender_friendly, mut receiver_friendly) = futures::channel::mpsc::unbounded();
    let (mut sender_aloof, mut receiver_aloof) = futures::channel::mpsc::unbounded();

    // Our toy async code.
    let async_friendly_task = async move {
        sender_friendly.send("Hello,").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        sender_friendly.send("world!").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        "Bye!"
    };

    let async_aloof_task = async move {
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("Oh,").await.ok();
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("it's you.").await.ok();
        "Regards."
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_friendly_task).unwrap();
    sched.schedule(async_aloof_task).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut (), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

And our output looks like:

Starting event loop.
Friendly said: Hello,
Aloof said: Oh,
Friendly said: world!
Async block ended with: Regards.
Aloof said: it's you.
Async block ended with: Bye!
Event loop ended.

Note that for the sake of keeping this example short, I've written the async code before running the loop. But async code can be scheduled from callbacks, or other sources within the loop too.

Note about threads

One of Calloop's strengths is that it is completely single threaded as written. However, many async crates are implemented using threads eg. async-std and async-process. This is not an inherent problem! Calloop will work perfectly well with such implementations in general. However, if you have selected Calloop because of your own constraints around threading, be aware of this.

Async IO types

This section is about adapting blocking IO types for use with async Rust code, and powering that async code with Calloop. If you just want to add blocking IO types to your event loop and use Calloop's callback/composition-based design, you only need to wrap your blocking IO type in a generic event source.

You may find that you need to write ordinary Rust async code around blocking IO types. Calloop provides the ability to wrap blocking types — anything that implements the AsFd trait — in its own async type. This can be polled in any executor you may have chosen for your async code, but if you're using Calloop you'll probably be using Calloop's executor.

Enable the futures-io feature!

To use calloop::io you need to enable the futures-io feature in your Cargo.toml like so:

[dependencies.calloop]
features = [ "futures-io" ]
version = ...

Realistically you will probably also want to use this with async code, so you should also enable the executor feature too.

Just like in the async example, we will use the components in calloop::futures. First, obtain both an executor and a scheduler with calloop::futures::executor():

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

The executor goes in the event loop:

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

For our blocking IO types, let's use an unnamed pair of Unix domain stream sockets. To convert them to async types, we simply call calloop::LoopHandle::adapt_io():

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

Note that most of the useful async functionality for the returned type is expressed through various traits in futures::io. So we need to explicitly use these:

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

We can now write async code around these types. Here's the receiving code:

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

And here's the sending code. The receiving and sending code can be created and added to the executor in either order.

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

All that's left is to run the loop:

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

And the output we get is:

Starting event loop. Use Ctrl-C to exit.
Async block ended with: Sent data...
Async block ended with: Hello, world
^C

A full example: ZeroMQ

The previous chapter showed how to use callbacks, event data and shared data to control our program. However, more complex programs will require more complex shared data, and more complex interactions between events. Eventually this will run up against ownership issues and just the basic mental load of the poor programmer.

In this chapter we're going to build something more complex: an event source based on ZeroMQ sockets.

ZeroMQ is (very) basically a highly abstracted socket library. You can create ZeroMQ sockets over TCP, PGM, IPC, in-process and more, and generally not worry about the transport mechanics. It guarantees atomic message transfer, and handles queuing, retries, reconnection and balancing under the hood. It also lets you integrate it with event loops and reactors by exposing a file descriptor that you can wait on.

But we can't just wrap this file descriptor in Calloop's generic::Generic source and be done — there are a few subtleties we need to take care of for it to work right and be useful.

Disclaimer

It might be tempting, at the end of this chapter, to think that the code we've written is the definitive ZeroMQ wrapper, able to address any use case or higher level pattern you like. Certainly it will be a lot more suited to Calloop than using ZeroMQ sockets by themselves, but it is not the only way to use them with Calloop. Here are some things I have not addressed, for the sake of simplicity:

  • We will not handle fairness — our code will totally monopolise the event loop if we receive many messages at once.
  • We do not consider back pressure beyond whatever built-in zsocket settings the caller might use.
  • We just drop pending messages in the zsocket's internal queue (in and out) on shutdown. In a real application, you might want to make more specific decisions about the timeout and linger periods before dropping the zsocket, depending on your application's requirements.
  • We don't deal with zsocket errors much. In fact, the overall error handling of event sources is usually highly specific to your application, so what we end up writing here is almost certainly not going to survive contact with your own code base. Here we just use ? everywhere, which will eventually cause the event loop to exit with an error.

So by all means, take the code we write here and use and adapt it, but please please note the caveats above and think carefully about your own program.

Composing event sources

Calloop is designed to work by composition. It provides you with some single-responsibility sources (timers, message channels, file descriptors), and you can combine these together, bit by bit, to make more complex event sources. These new sources can express more and more of your program's internal logic and the relationships between them, always in terms of events and how you process them.

You can greatly simplify even a highly complex program if you identify and expose the "real" events you care about and use composition to tidy the other events away in internal details of event sources.

So what do we need to compose?

The generic source

Most obviously, ZeroMQ exposes a file descriptor for us to use. (This is a common thing for event-related libraries to do, so if you're wondering how to integrate, say, I²C or GPIO on Linux with Calloop, that's your answer.)

Calloop can use file descriptors via the calloop::generic::Generic source. So that's one.

The MPSC channel source

Secondly, we might want to send messages on the socket. This means our event source needs to react when we send it a message. Calloop has a message channel for precisely this purpose: calloop::channel::Channel. That's another one.

The wakeup call

The third event source we need is a bit subtle, but since this isn't a mystery novel I can save you hours of debugging and spoil the ending now: we need a "ping" event source because ZeroMQ's FD is edge triggered.

ZeroMQ's file descriptor is not the FD of an actual file or socket — you do not actually read data from it. It exists as an interface, with three important details:

  • It is only ever readable. Even if the underlying socket can be written to, the FD that ZeroMQ gives you signals this by becoming readable. In fact, this FD will become readable under three circumstances: the ZeroMQ socket (henceforth called a "zsocket") is readable, writeable, or has an error. There is a separate function call, zmq::Socket::get_events() that will tell you which.

  • It is edge triggered. It will only ever change from not-readable to readable when the socket's state changes. So if a zsocket receives two messages, and you only read one, the file descriptor will not wake up the event loop again. Why not? Because it hasn't changed state! After you read one message, the zsocket still has events waiting. If it receives yet another message... it still has events waiting. No change in internal state = no external event.

  • This edge triggering also covers user actions. If a zsocket becomes writeable, and then you write to the zsocket, it might immediately (and atomically) change from writeable to readable. In this case you will not get another event on the FD.

(The docs make this quite explicit, but there's a lot of docs to read so I'm spelling it out here.)

What this adds up to is this: when we create our zsocket, it might already be readable or writeable. So when we add it to our event loop, it won't fire any events. Our entire source will just sit there until we wake it up by sending a message (which we might never do if it's eg. a pull socket).

So the last event source we need is something that doesn't really convey any kind of message except "please wake up the event loop on the next iteration", and that is exactly what a calloop::ping::PingSource does. And that's three.

Creating our source, part I: our types

In the last chapter we worked out a list of the event sources we need to compose into a new type:

  1. calloop::generic::Generic
  2. calloop::channel::Channel
  3. calloop::ping::Ping

So at a minimum, our type needs to contain these:

pub struct ZeroMQSource
{
    // Calloop components.
    socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
    mpsc_receiver: calloop::channel::Channel<?>,
    wake_ping_receiver: calloop::ping::PingSource,
}

Note that I've left the type for the channel as ? — we'll get to that a bit later.

What else do we need? If the PingSource is there to wake up the loop manually, we need to keep the other end of it. The ping is an internal detail — users of our type don't need to know it's there. We also need the zsocket itself, so we can actually detect and process events on it. That gives us:

pub struct ZeroMQSource
{
    // Calloop components.
    socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
    mpsc_receiver: calloop::channel::Channel<?>,
    wake_ping_receiver: calloop::ping::PingSource,

    /// Sending end of the ping source.
    wake_ping_sender: calloop::ping::Ping,
}

The message type

The most obvious candidate for the type of the message queue would be zmq::Message. But ZeroMQ sockets are capable of sending multipart messages, and this is even mandatory for eg. the PUB zsocket type, where the first part of the message is the topic.

Therefore it makes more sense to accept a sequence of messages to cover the most general case, and that sequence can have a length of one for single-part messages. But with one more tweak: we can accept a sequence of things that can be transformed into zmq::Message values. The exact type we'll use will be a generic type like so:

pub struct ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    mpsc_receiver: calloop::channel::Channel<T>,
	// ...
}

Enforcing single messages

Remember that it's not just Vec<T> and other sequence types that implement IntoIteratorOption<T> implements it too! There is also std::iter::Once<T>. So if a user of our API wants to enforce that all "multi"-part messages actually contain exactly one part, they can use this API with T being, say, std::iter::Once<zmq::Message> (or even just [zmq::Message; 1] in Rust 2021 edition).

Associated types

The EventSource trait has four associated types:

  • Event - when an event is generated that our caller cares about (ie. not some internal thing), this is the data we provide to their callback. This will be another sequence of messages, but because we're constructing it we can be more opinionated about the type and use the return type of zmq::Socket::recv_multipart() which is Vec<Vec<u8>>.

  • Metadata - this is a more persistent kind of data, perhaps the underlying file descriptor or socket, or maybe some stateful object that the callback can manipulate. It is passed by exclusive reference to the Metadata type. In our case we don't use this, so it's ().

  • Ret - this is the return type of the callback that's called on an event. Usually this will be a Result of some sort; in our case it's std::io::Result<()> just to signal whether some underlying operation failed or not.

  • Error - this is the error type returned by process_events() (not the user callback!). Having this as an associated type allows you to have more control over error propagation in nested event sources. We will use Anyhow, which is like a more fully-features Box<dyn Error>. It allows you to add context to any other error with a context() method.

So together these are:

impl<T> calloop::EventSource for ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    type Event = Vec<Vec<u8>>;
    type Metadata = ();
    type Ret = io::Result<()>;
    type Error = anyhow::Error;
    // ...
}

I have saved one surprise for later to emphasise some important principles, but for now, let's move on to defining some methods!

Creating our source, part II: setup methods

Now that we've figured out the types we need, we can get to work writing some methods. We'll need to implement the methods defined in the calloop::EventSource trait, and a constructor function to create the source.

Our constructor

Creating our source is fairly straightforward. We can let the caller set up the zsocket the way they need, and take ownership of it when it's initialised. Our caller needs not only the source itself, but the sending end of the MPSC channel so they can send messages, so we need to return that too.

A common pattern in Calloop's own constructor functions is to return a tuple containing (a) the source and (b) a type to use the source. So that's what we'll do:

// Converts a `zmq::Socket` into a `ZeroMQSource` plus the sending end of an
// MPSC channel to enqueue outgoing messages.
pub fn from_socket(socket: zmq::Socket) -> io::Result<(Self, calloop::channel::Sender<T>)> {
    let (mpsc_sender, mpsc_receiver) = calloop::channel::channel();
    let (wake_ping_sender, wake_ping_receiver) = calloop::ping::make_ping()?;

    let fd = socket.get_fd()?;

    let socket_source =
        calloop::generic::Generic::from_fd(fd, calloop::Interest::READ, calloop::Mode::Edge);

    Ok((
        Self {
            socket,
            socket_source,
            mpsc_receiver,
            wake_ping_receiver,
            wake_ping_sender,
        },
        mpsc_sender,
    ))
}

Trait methods: registering sources

Calloop's event sources have a kind of life cycle, starting with registration. When you add an event source to the event loop, under the hood the source will register itself with the loop. Under certain circumstances a source will need to re-register itself. And finally there is the unregister action when an event source is removed from the loop. These are expressed via the calloop::EventSource methods:

  • fn register(&mut self, poll: &mut calloop::Poll, token_factory: &mut calloop::TokenFactory) -> calloop::Result<()>
  • fn reregister(&mut self, poll: &mut calloop::Poll, token_factory: &mut calloop::TokenFactory) -> calloop::Result<()>
  • fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()>

The first two methods take a token factory, which is a way for Calloop to keep track of why your source was woken up. When we get to actually processing events, you'll see how this works. But for now, all you need to do is recursively pass the token factory into whatever sources your own event source is composed of. This includes other composed sources, which will pass the token factory into their sources, and so on.

In practise this looks like:

fn register(
    &mut self,
    poll: &mut calloop::Poll,
    token_factory: &mut calloop::TokenFactory
) -> calloop::Result<()>
{
    self.socket_source.register(poll, token_factory)?;
    self.mpsc_receiver.register(poll, token_factory)?;
    self.wake_ping_receiver.register(poll, token_factory)?;
    self.wake_ping_sender.ping();

    Ok(())
}

fn reregister(
    &mut self,
    poll: &mut calloop::Poll,
    token_factory: &mut calloop::TokenFactory
) -> calloop::Result<()>
{
    self.socket_source.reregister(poll, token_factory)?;
    self.mpsc_receiver.reregister(poll, token_factory)?;
    self.wake_ping_receiver.reregister(poll, token_factory)?;

    self.wake_ping_sender.ping();

    Ok(())
}


fn unregister(&mut self, poll: &mut calloop::Poll)-> calloop::Result<()> {
    self.socket_source.unregister(poll)?;
    self.mpsc_receiver.unregister(poll)?;
    self.wake_ping_receiver.unregister(poll)?;
    Ok(())
}

Note the self.wake_ping_sender.ping() call in the first two functions! This is how we manually prompt the event loop to wake up and run our source on the next iteration, to properly account for the zsocket's edge-triggering.

Our drop handler

ZeroMQ sockets have their own internal queues and state, and therefore need a bit of care when shutting down. Depending on zsocket type and settings, when the ZeroMQ context is dropped, it could block waiting for certain operations to complete. We can write a drop handler to avoid this, but again note that it's only one of many ways to handle zsocket shutdown.

impl<T> Drop for ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    fn drop(&mut self) {
        // This is one way to stop socket code (especially PUSH sockets) hanging
        // at the end of any blocking functions.
        //
        // - https://stackoverflow.com/a/38338578/188535
        // - http://api.zeromq.org/4-0:zmq-ctx-term
        self.socket.set_linger(0).ok();
        self.socket.set_rcvtimeo(0).ok();
        self.socket.set_sndtimeo(0).ok();

        // Double result because (a) possible failure on call and (b) possible
        // failure decoding.
        if let Ok(Ok(last_endpoint)) = self.socket.get_last_endpoint() {
            self.socket.disconnect(&last_endpoint).ok();
        }
    }
}

Creating our source, part III: processing events (almost)

Finally, the real functionality we care about! Processing events! This is also a method in the calloop::EventSource trait:

fn process_events<F>(
    &mut self,
    readiness: calloop::Readiness,
    token: calloop::Token,
    mut callback: F,
) -> Result<calloop::PostAction, Self::Error>
where
    F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,

What a mouthful! But when you break it down, it's not so complicated:

  • We take our own state, of course, as &mut self.

  • We take a Readiness value - this is mainly useful for "real" file descriptors, and tells you whether the event source was woken up for a read or write event. We ignore it though, because our internal sources are always only readable (remember that even if the zsocket is writeable, the FD it exposes is only ever readable).

  • We take a token. This gives us a way to process events that arise from our internal sources. In general, composed sources should not actually need to use this directly; sub-sources will check their own tokens against this and run if necessary.

  • We take a callback. We call this callback with any "real" events that our caller will care about; in our case, that means messages we receive on the zsocket. It is closely related to the EventSource trait's associated types. Note that the callback our caller supplies when adding our source to the loop actually takes an extra argument, which is some data that we won't know about in our source. Calloop's internals take care of combining our arguments here with this extra data.

  • Finally we return a PostAction, which tells the loop whether it needs to change the state of our event source, perhaps as a result of actions we took. For example, you might require that your source be removed from the loop (with PostAction::Remove) if it only has a certain thing to do. Ordinarily though, you'd return PostAction::Continue for your source to keep waiting for events.

Note that these PostAction values correspond to various methods on the LoopHandle type (eg. PostAction::Disable does the same as LoopHandle::disable()). Whether you control your event source by returning a PostAction or using the LoopHandle methods depends on whether it makes more sense for these actions to be taken from within your event source or by something else in your code.

Implementing process_events() for a type that contains various Calloop sources composed together, like we have, is done recursively by calling our internal sources' process_events() method. The token that Calloop gives us is how each event source determines whether it was responsible for the wakeup and has events to process.

If we were woken up because of the ping source, then the ping source's process_events() will see that the token matches its own, and call the callback (possibly multiple times). If we were woken up because a message was sent through the MPSC channel, then the channel's process_events() will match on the token instead and call the callback for every message waiting. The zsocket is a little different, and we'll go over that in detail.

For error handling we're using Anyhow, hence the context() calls on each fallible operation. These just add a message to any error that might appear in a traceback.

So a first draft of our code might look like:

fn process_events<F>(
    &mut self,
    readiness: calloop::Readiness,
    token: calloop::Token,
    mut callback: F,
) -> Result<calloop::PostAction, Self::Error>
where
    F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
    // Runs if we were woken up on startup/registration.
    self.wake_ping_receiver
        .process_events(readiness, token, |_, _| {})
        .context("Failed after registration")?;

    // Runs if we received a message over the MPSC channel.
    self.mpsc_receiver
        .process_events(readiness, token, |evt, _| {
            // 'evt' could be a message or a "sending end closed"
            // notification. We don't care about the latter.
            if let calloop::channel::Event::Msg(msg) = evt {
                self.socket
                    .send_multipart(msg, 0)
                    .context("Failed to send message")?;
            }
        })?;

	// Runs if the zsocket became read/write-able.
    self.socket
        .process_events(readiness, token, |_, _| {
            let events =
                self.socket
                    .get_events()
                    .context("Failed to read ZeroMQ events")?;
        
            if events.contains(zmq::POLLOUT) {
                // Wait, what do we do here?
            }

            if events.contains(zmq::POLLIN) {
                let messages =
                    self.socket
                        .recv_multipart(0)
                        .context("Failed to receive message")?;

                callback(messages, &mut ())
                    .context("Error in event callback")?;
            }
        })?;

    Ok(calloop::PostAction::Continue)
}

We process the events from whichever source woke up our composed source, and if we woke up because the zsocket became readable, we call the callback with the message we received. Finally we return PostAction::Continue to remain in the event loop.

Don't worry about getting this to compile, it is a good start but it's wrong in a few ways.

Firstly, we've gone to all the trouble of using a ping to wake up the source, and then we just... drain its internal events and return. Which achieves nothing.

Secondly, we don't seem to know what to do when our zsocket becomes writeable (the actual zsocket, not the "interface" file descriptor).

Thirdly, we commit one of the worst sins you can commit in an event-loop-based system. Can you see it? It's this part:

self.mpsc_receiver
    .process_events(readiness, token, |evt, _| {
        if let calloop::channel::Event::Msg(msg) = evt {
            self.socket
                .file
                .send_multipart(msg, 0)
                .context("Failed to send message")?;
        }
    })?;

We block the event loop! In the middle of processing events from the MPSC channel, we call zmq::Socket::send_multipart() which could, under certain circumstances, block! We shouldn't do that.

Let's deal with this badness first then. We want to decouple "receiving messages over the MPSC channel" from "sending messages on the zsocket". There are different ways to do this, but they boil down to: buffer messages or drop messages (or maybe a combination of both). We'll use the first approach, with an internal FIFO queue. When we receive messages, we push them onto the back of the queue. When the zsocket is writeable, we pop messages from the front of the queue.

The standard library has collections::VecDeque<T> which provides efficient double-ended queuing, so let's use that. This is some extra internal state, so we need to add it to our type, which becomes:

pub struct ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    // Calloop components.
    socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
    mpsc_receiver: calloop::channel::Channel<T>,
    wake_ping_receiver: calloop::ping::PingSource,

    /// Sending end of the ping source.
    wake_ping_sender: calloop::ping::Ping,

    /// FIFO queue for the messages to be published.
    outbox: std::collections::VecDeque<T>,
}

Our MPSC receiving code becomes:

let outbox = &mut self.outbox;

self.mpsc_receiver
    .process_events(readiness, token, |evt, _| {
        if let calloop::channel::Event::Msg(msg) = evt {
            outbox.push_back(msg);
        }
    })?;

And our "zsocket is writeable" code becomes:

self.socket
    .file
    .process_events(readiness, token, |_, _| {
        let events = self
            .socket
            .file
            .get_events()
            .context("Failed to read ZeroMQ events")?;
    
        if events.contains(zmq::POLLOUT) {
            if let Some(parts) = self.outbox.pop_front() {
                self.socket
                    .file
                    .send_multipart(parts, 0)
                    .context("Failed to send message")?;
            }
       }

        if events.contains(zmq::POLLIN) {
            let messages =
                self.socket
                    .file
                    .recv_multipart(0)
                    .context("Failed to receive message")?;
            callback(messages, &mut ())
                .context("Error in event callback")?;
        }
    })?;

So we've not only solved problem #3, we've also figured out #2, which suggests we're on the right track. But we still have (at least) that first issue to sort out.

Creating our source, part IV: processing events (really)

We have three events that could wake up our event source: the ping, the channel, and the zsocket itself becoming ready to use. All three of these reasons potentially mean doing something on the zsocket: if the ping fired, we need to check for any pending events. If the channel received a message, we want to check if the zsocket is already readable and send it. If the zsocket becomes readable or writeable, we want to read from or write to it. In other words... We want to run it every time!

Also notice that in the zsocket process_events() call, we don't use any of the arguments, including the event itself. That file descriptor is merely a signalling mechanism! Sending and receiving messages is what will actually clear any pending events on it, and reset it to a state where it will wake the event loop later.

let events = self
    .socket
    .get_events()
    .context("Failed to read ZeroMQ events")?;

if events.contains(zmq::POLLOUT) {
    if let Some(parts) = self.outbox.pop_front() {
        self.socket
            .send_multipart(parts, 0)
            .context("Failed to send message")?;
    }
}

if events.contains(zmq::POLLIN) {
    let messages =
        self.socket
            .recv_multipart(0)
            .context("Failed to receive message")?;
    callback(messages, &mut ())
        .context("Error in event callback")?;
}

So the second draft of our process_events() function is now:

fn process_events<F>(
    &mut self,
    readiness: calloop::Readiness,
    token: calloop::Token,
    mut callback: F,
) -> Result<calloop::PostAction, Self::Error>
where
    F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
    // Runs if we were woken up on startup/registration.
    self.wake_ping_receiver
        .process_events(readiness, token, |_, _| {})?;

    // Runs if we were woken up because a message was sent on the channel.
    let outbox = &mut self.outbox;

    self.mpsc_receiver
        .process_events(readiness, token, |evt, _| {
            if let calloop::channel::Event::Msg(msg) = evt {
                outbox.push_back(msg);
            }
        })?;

	// Always process any pending zsocket events.

    let events = self
        .socket
        .get_events()
        .context("Failed to read ZeroMQ events")?;

    if events.contains(zmq::POLLOUT) {
        if let Some(parts) = self.outbox.pop_front() {
            self.socket
                .send_multipart(parts, 0)
                .context("Failed to send message")?;
        }
    }

    if events.contains(zmq::POLLIN) {
        let messages =
            self.socket
                .recv_multipart(0)
                .context("Failed to receive message")?;
        callback(messages, &mut ())
            .context("Error in event callback")?;
    }

    Ok(calloop::PostAction::Continue)
}

There is one more issue to take care of, and it's got nothing to do with Calloop. We still haven't fully dealt with ZeroMQ's edge-triggered nature.

Consider this situation:

  • We create a REQ zsocket. These are intended to be used in strict send/receive/send/receive/etc. sequence.
  • We wrap it in our ZeroMQSource and add that to our loop.
  • We send a message.

If we do this, it's possible we'll never actually receive any replies that are sent to our zsocket! Why? Because:

  • we read the events on the socket into events
  • then we send a message on the socket
  • another process sends a reply so quickly, it arrives more or less immediately
  • then we use the same events to check if the socket is readable
  • then we exit

The zsocket will change from writeable to readable before we leave process_events(). So the "interface" file descriptor will become readable again. But because it is edge triggered, it will not wake up our event source after we leave process_events(). So our source will not wake up again (at least, not due to the self.socket event source).

For this specific example, it will suffice to re-read the zsocket events in between the if statements. Then when we get to the second events check, it will indeed contain zmq::POLLIN and receive the pending message. But this is not good enough for the general case! If we replace REQ with REP above, we'll get the opposite problem: our first check (for POLLOUT) will be false. Our second check (POLLIN) will be true. We'll receive a message, leave process_events(), and never wake up again.

The full solution is to recognise that any user action on a ZeroMQ socket can cause the pending events to change, or just to remain active, without re-triggering the "interface" file descriptor. So we need to (a) do this repeatedly and (b) keep track of when we have or haven't performed an action on the zsocket. Here's one way to do it:

loop {
    let events = self
        .socket
        .get_events()
        .context("Failed to read ZeroMQ events")?;

    let mut used_socket = false;

    if events.contains(zmq::POLLOUT) {
        if let Some(parts) = self.outbox.pop_front() {
            self.socket
                .as_ref()
                .send_multipart(parts, 0)
                .context("Failed to send message")?;
            used_socket = true;
        }
    }

    if events.contains(zmq::POLLIN) {
        let messages =
            self.socket
                .recv_multipart(0)
                .context("Failed to receive message")?;
        used_socket = true;

        callback(messages, &mut ())
            .context("Error in event callback")?;
    }

    if !used_socket {
        break;
    }
}

Now we have a flag that we set if, and only if, we call a send or receive method on the zsocket. If that flag is set at the end of the loop, we go around again.

Greediness

Remember my disclaimer at the start of the chapter, about this code being "greedy"? This is what I mean. This loop will run until the entire message queue is empty, so if it has a lot of messages in it, any other sources in our event loop will not be run until this loop is finished.

An alternative approach is to use more state to determine whether we want to run again on the next loop iteration (perhaps using the ping source), so that Calloop can run any other sources in between individual messages being received.

The full ZeroMQ event source code

This is the full source code for a Calloop event source based on a ZeroMQ socket. You might find it useful as a kind of reference. Please read the disclaimer at the start of this chapter if you skipped straight here!

//! A Calloop event source implementation for ZeroMQ sockets.

use std::{collections, io};

use anyhow::Context;

/// A Calloop event source that contains a ZeroMQ socket (of any kind) and a
/// Calloop MPSC channel for sending over it.
///
/// The basic interface is:
/// - create a zmq::Socket for your ZeroMQ socket
/// - use `ZeroMQSource::from_socket()` to turn it into a Calloop event source
///   (plus the sending end of the channel)
/// - queue messages to be sent by sending them on the sending end of the MPSC
///   channel
/// - add the event source to the Calloop event loop with a callback to handle
///   reading
/// - the sending end of the MPSC channel can be cloned and sent across threads
///   if necessary
///
/// This type is parameterised by `T`:
///
///     T where T: IntoIterator, T::Item: Into<zmq::Message>
//
/// This means that `T` is anything that can be converted to an iterator, and
/// the items in the iterator are anything that can be converted to a
/// `zmq::Message`. So eg. a `Vec<String>` would work.
///
/// The callback is called whenever the underlying socket becomes readable. It
/// is called with a vec of byte sequences (`Vec<Vec<u8>>`) and the event loop
/// data set by the user.
///
/// Note about why the read data is a vec of multipart message parts: we don't
/// know what kind of socket this is, or what will be sent, so the most general
/// thing we can do is receive the entirety of a multipart message and call the
/// user callback with the whole set. Usually the number of parts in a multipart
/// message will be one, but the code will work just the same when it's not.
///
/// This event source also allows you to use different event sources to publish
/// messages over the same writeable ZeroMQ socket (usually PUB or PUSH).
/// Messages should be sent over the Calloop MPSC channel sending end. This end
/// can be cloned and used by multiple senders.

pub struct ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    // Calloop components.
    /// Event source for ZeroMQ socket.
    socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,

    /// Event source for channel.
    mpsc_receiver: calloop::channel::Channel<T>,

    /// Because the ZeroMQ socket is edge triggered, we need a way to "wake" the
    /// event source upon (re-)registration. We do this with a separate
    /// `calloop::ping::Ping` source.
    wake_ping_receiver: calloop::ping::PingSource,

    /// Sending end of the ping source.
    wake_ping_sender: calloop::ping::Ping,

    // ZeroMQ socket.
    /// FIFO queue for the messages to be published.
    outbox: collections::VecDeque<T>,
}

impl<T> ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    // Converts a `zmq::Socket` into a `ZeroMQSource` plus the sending end of an
    // MPSC channel to enqueue outgoing messages.
    pub fn from_socket(socket: zmq::Socket) -> io::Result<(Self, calloop::channel::Sender<T>)> {
        let (mpsc_sender, mpsc_receiver) = calloop::channel::channel();
        let (wake_ping_sender, wake_ping_receiver) = calloop::ping::make_ping()?;

        let socket = calloop::generic::Generic::new(
            unsafe { calloop::generic::FdWrapper::new(socket) },
            calloop::Interest::READ,
            calloop::Mode::Edge,
        );

        Ok((
            Self {
                socket,
                mpsc_receiver,
                wake_ping_receiver,
                wake_ping_sender,
                outbox: collections::VecDeque::new(),
            },
            mpsc_sender,
        ))
    }
}

/// This event source runs for three events:
///
/// 1. The event source was registered. It is forced to run so that any pending
///    events on the socket are processed.
///
/// 2. A message was sent over the MPSC channel. In this case we put it in the
///    internal queue.
///
/// 3. The ZeroMQ socket is readable. For this, we read off a complete multipart
///    message and call the user callback with it.
///
/// The callback provided to `process_events()` may be called multiple times
/// within a single call to `process_events()`.
impl<T> calloop::EventSource for ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    type Event = Vec<Vec<u8>>;
    type Metadata = ();
    type Ret = io::Result<()>;
    type Error = anyhow::Error;

    fn process_events<F>(
        &mut self,
        readiness: calloop::Readiness,
        token: calloop::Token,
        mut callback: F,
    ) -> Result<calloop::PostAction, Self::Error>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        // Runs if we were woken up on startup/registration.
        self.wake_ping_receiver
            .process_events(readiness, token, |_, _| {})
            .context("Failed after registration")?;

        // Runs if we were woken up because a message was sent on the channel.
        let outbox = &mut self.outbox;

        self.mpsc_receiver
            .process_events(readiness, token, |evt, _| {
                if let calloop::channel::Event::Msg(msg) = evt {
                    outbox.push_back(msg);
                }
            })
            .context("Failed to processing outgoing messages")?;

        // The ZeroMQ file descriptor is edge triggered. This means that if (a)
        // messages are added to the queue before registration, or (b) the
        // socket became writeable before messages were enqueued, we will need
        // to run the loop below. Hence, it always runs if this event source
        // fires. The process_events() method doesn't do anything though, so we
        // ignore it.

        loop {
            // According to the docs, the edge-triggered FD will not change
            // state if a socket goes directly from being readable to being
            // writeable (or vice-versa) without there being an in-between point
            // where there are no events. This can happen as a result of sending
            // or receiving on the socket while processing such an event. The
            // "used_socket" flag below tracks whether we perform an operation
            // on the socket that warrants reading the events again.
            let events = self
                .socket
                .file
                .get_events()
                .context("Failed to read ZeroMQ events")?;

            let mut used_socket = false;

            if events.contains(zmq::POLLOUT) {
                if let Some(parts) = self.outbox.pop_front() {
                    self.socket
                        .file
                        .send_multipart(parts, 0)
                        .context("Failed to send message")?;
                    used_socket = true;
                }
            }

            if events.contains(zmq::POLLIN) {
                // Batch up multipart messages. ZeroMQ guarantees atomic message
                // sending, which includes all parts of a multipart message.
                let messages = self
                    .socket
                    .file
                    .recv_multipart(0)
                    .context("Failed to receive message")?;
                used_socket = true;

                // Capture and report errors from the callback, but don't propagate
                // them up.
                callback(messages, &mut ()).context("Error in event callback")?;
            }

            if !used_socket {
                break;
            }
        }

        Ok(calloop::PostAction::Continue)
    }

    fn register(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        self.socket.register(poll, token_factory)?;
        self.mpsc_receiver.register(poll, token_factory)?;
        self.wake_ping_receiver.register(poll, token_factory)?;

        self.wake_ping_sender.ping();

        Ok(())
    }

    fn reregister(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        self.socket.reregister(poll, token_factory)?;
        self.mpsc_receiver.reregister(poll, token_factory)?;
        self.wake_ping_receiver.reregister(poll, token_factory)?;

        self.wake_ping_sender.ping();

        Ok(())
    }

    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
        self.socket.unregister(poll)?;
        self.mpsc_receiver.unregister(poll)?;
        self.wake_ping_receiver.unregister(poll)?;
        Ok(())
    }
}

impl<T> Drop for ZeroMQSource<T>
where
    T: IntoIterator,
    T::Item: Into<zmq::Message>,
{
    fn drop(&mut self) {
        // This is one way to stop socket code (especially PUSH sockets) hanging
        // at the end of any blocking functions.
        //
        // - https://stackoverflow.com/a/38338578/188535
        // - http://api.zeromq.org/4-0:zmq-ctx-term
        self.socket.file.set_linger(0).ok();
        self.socket.file.set_rcvtimeo(0).ok();
        self.socket.file.set_sndtimeo(0).ok();

        // Double result because (a) possible failure on call and (b) possible
        // failure decoding.
        if let Ok(Ok(last_endpoint)) = self.socket.file.get_last_endpoint() {
            self.socket.file.disconnect(&last_endpoint).ok();
        }
    }
}

pub fn main() {}

Dependencies are:

  • calloop (whatever version this document was built from)
  • zmq 0.9
  • anyhow 1.0
[dependencies]
calloop = { path = '../..' }
zmq = "0.9"
anyhow = "1.0"