Run async code

Enable the executor feature!

To use calloop::futures you need to enable the executor feature in your Cargo.toml like so:

[dependencies.calloop]
features = [ "executor" ]
version = ...

Let's say you have some async code that looks like this:

sender.send("Hello,").await.ok();
receiver.next().await.map(|m| println!("Received: {}", m));
sender.send("world!").await.ok();
receiver.next().await.map(|m| println!("Received: {}", m));
"So long!"

...and a corresponding block that receives and sends to this one. I will call one of these blocks "friendly" and the other one "aloof".

To run async code in Calloop, you use the components in calloop::futures. First, obtain both an executor and a scheduler with calloop::futures::executor():

use calloop::EventLoop;

// futures = "0.3"
use futures::sink::SinkExt;
use futures::stream::StreamExt;

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    // Let's create two channels for our async blocks below. The blocks will
    // exchange messages via these channels.
    let (mut sender_friendly, mut receiver_friendly) = futures::channel::mpsc::unbounded();
    let (mut sender_aloof, mut receiver_aloof) = futures::channel::mpsc::unbounded();

    // Our toy async code.
    let async_friendly_task = async move {
        sender_friendly.send("Hello,").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        sender_friendly.send("world!").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        "Bye!"
    };

    let async_aloof_task = async move {
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("Oh,").await.ok();
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("it's you.").await.ok();
        "Regards."
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_friendly_task).unwrap();
    sched.schedule(async_aloof_task).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut (), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

The executor, the part that executes the future, goes in the event loop:

use calloop::EventLoop;

// futures = "0.3"
use futures::sink::SinkExt;
use futures::stream::StreamExt;

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    // Let's create two channels for our async blocks below. The blocks will
    // exchange messages via these channels.
    let (mut sender_friendly, mut receiver_friendly) = futures::channel::mpsc::unbounded();
    let (mut sender_aloof, mut receiver_aloof) = futures::channel::mpsc::unbounded();

    // Our toy async code.
    let async_friendly_task = async move {
        sender_friendly.send("Hello,").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        sender_friendly.send("world!").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        "Bye!"
    };

    let async_aloof_task = async move {
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("Oh,").await.ok();
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("it's you.").await.ok();
        "Regards."
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_friendly_task).unwrap();
    sched.schedule(async_aloof_task).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut (), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

Now let's write our async code in full:

use calloop::EventLoop;

// futures = "0.3"
use futures::sink::SinkExt;
use futures::stream::StreamExt;

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    // Let's create two channels for our async blocks below. The blocks will
    // exchange messages via these channels.
    let (mut sender_friendly, mut receiver_friendly) = futures::channel::mpsc::unbounded();
    let (mut sender_aloof, mut receiver_aloof) = futures::channel::mpsc::unbounded();

    // Our toy async code.
    let async_friendly_task = async move {
        sender_friendly.send("Hello,").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        sender_friendly.send("world!").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        "Bye!"
    };

    let async_aloof_task = async move {
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("Oh,").await.ok();
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("it's you.").await.ok();
        "Regards."
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_friendly_task).unwrap();
    sched.schedule(async_aloof_task).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut (), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

Like any block in Rust, the value of your async block is the last expression ie. it is effectively "returned" from the block, which means it will be provided to your executor's callback as the first argument (the "event"). You'll see this in the output with the Async block ended with: ... lines.

Finally, we run the loop:

use calloop::EventLoop;

// futures = "0.3"
use futures::sink::SinkExt;
use futures::stream::StreamExt;

fn main() -> std::io::Result<()> {
    let (exec, sched) = calloop::futures::executor()?;

    let mut event_loop = EventLoop::try_new()?;
    let handle = event_loop.handle();

    handle
        .insert_source(exec, |evt, _metadata, _shared| {
            // Print the value of the async block ie. the return value.
            println!("Async block ended with: {}", evt);
        })
        .map_err(|e| e.error)?;

    // Let's create two channels for our async blocks below. The blocks will
    // exchange messages via these channels.
    let (mut sender_friendly, mut receiver_friendly) = futures::channel::mpsc::unbounded();
    let (mut sender_aloof, mut receiver_aloof) = futures::channel::mpsc::unbounded();

    // Our toy async code.
    let async_friendly_task = async move {
        sender_friendly.send("Hello,").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        sender_friendly.send("world!").await.ok();
        if let Some(msg) = receiver_aloof.next().await {
            println!("Aloof said: {}", msg);
        }
        "Bye!"
    };

    let async_aloof_task = async move {
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("Oh,").await.ok();
        if let Some(msg) = receiver_friendly.next().await {
            println!("Friendly said: {}", msg);
        }
        sender_aloof.send("it's you.").await.ok();
        "Regards."
    };

    // Schedule the async block to be run in the event loop.
    sched.schedule(async_friendly_task).unwrap();
    sched.schedule(async_aloof_task).unwrap();

    // Run the event loop.
    println!("Starting event loop. Use Ctrl-C to exit.");
    event_loop.run(None, &mut (), |_| {})?;
    println!("Event loop ended.");

    Ok(())
}

And our output looks like:

Starting event loop.
Friendly said: Hello,
Aloof said: Oh,
Friendly said: world!
Async block ended with: Regards.
Aloof said: it's you.
Async block ended with: Bye!
Event loop ended.

Note that for the sake of keeping this example short, I've written the async code before running the loop. But async code can be scheduled from callbacks, or other sources within the loop too.

Note about threads

One of Calloop's strengths is that it is completely single threaded as written. However, many async crates are implemented using threads eg. async-std and async-process. This is not an inherent problem! Calloop will work perfectly well with such implementations in general. However, if you have selected Calloop because of your own constraints around threading, be aware of this.