Async IO types

This section is about adapting blocking IO types for use with async Rust code, and powering that async code with Calloop. If you just want to add blocking IO types to your event loop and use Calloop's callback/composition-based design, you only need to wrap your blocking IO type in a generic event source.

You may find that you need to write ordinary Rust async code around blocking IO types. Calloop provides the ability to wrap blocking types — anything that implements the AsFd trait — in its own async type. This can be polled in any executor you may have chosen for your async code, but if you're using Calloop you'll probably be using Calloop's executor.

Enable the futures-io feature!

To use calloop::io you need to enable the futures-io feature in your Cargo.toml like so:

[dependencies.calloop]
features = [ "futures-io" ]
version = ...

Realistically you will probably also want to use this with async code, so you should also enable the executor feature too.

Just like in the async example, we will use the components in calloop::futures. First, obtain both an executor and a scheduler with calloop::futures::executor():

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

The executor goes in the event loop:

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

For our blocking IO types, let's use an unnamed pair of Unix domain stream sockets. To convert them to async types, we simply call calloop::LoopHandle::adapt_io():

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

Note that most of the useful async functionality for the returned type is expressed through various traits in futures::io. So we need to explicitly use these:

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

We can now write async code around these types. Here's the receiving code:

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

And here's the sending code. The receiving and sending code can be created and added to the executor in either order.

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

All that's left is to run the loop:

use calloop::EventLoop;

use futures::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    let (sender, receiver) = std::os::unix::net::UnixStream::pair().unwrap();
    let mut sender = handle.adapt_io(sender).unwrap();
    let mut receiver = handle.adapt_io(receiver).unwrap();

    let async_receive = async move {
        let mut buf = [0u8; 12];
        // Here's our async-ified Unix domain socket.
        receiver.read_exact(&mut buf).await.unwrap();
        std::str::from_utf8(&buf).unwrap().to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_receive).unwrap();

    let async_send = async move {
        // Here's our async-ified Unix domain socket.
        sender.write_all(b"Hello, world!").await.unwrap();
        "Sent data...".to_owned()
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_send).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut event_loop.get_signal(), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

And the output we get is:

Starting event loop. Use Ctrl-C to exit.
Async block ended with: Sent data...
Async block ended with: Hello, world
^C