profile picture

A tour of Rust Futures, part 1

September 28, 2020 - rust futures

There's been a lot of discussion lately about the future of Rust's async runtimes— will we eventually have a single, standard runtime? Will we create some kind of global runtime registrar? How do you combine libraries that depend on different runtimes? It's my hope that these questions become not answered, but obsoleted entirely. One of the beautiful things about the design of rust's futures is that they're totally agnostic to the underlying runtime. This means that, in principle, once you have a future, you have a handle to a unit of work that can run regardless of the underlying runtime.

To demonstrate this, over the next few articles, I'm going to create a very tiny async runtime. In doing so, I'll walk through interface of Future. Once our runtime is working, I'll create some basic async utilities and run them in our runtime. My hope is to show that Rust's modular approach to async makes it much less complicated than it often seems at first, and show the advantages of moving towards a runtime-agnostic futures ecosystem.

The Future trait

To begin, let's go over the Future trait. It's actually very simple; it just a single method and associated type. This type (and the contract it defines) is essentially the entire async interface for rust; everything else is built on top of it.

trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

To some developers, especially those coming from other async ecosystems like Javascript, Python, or C#, this interface looks a bit scary. The whole point of an async abstraction is supposed to be to get away from a poll-driven model and instead let the runtime notify us when things are ready, such as through callbacks or await. However, this interface has more in common with those than it appears at first.

The contract

A future is an object that refers to some unit of work that will be run asynchronously. When this work is complete, it is returned as an Output. This work is driven in the foreground by the poll method. This means that, unlike other async models where the actual logic is submitted to a global work queue, in rust the work is simply performed directly by the owner of the future, in poll, like a normal function call.

The key to making this asynchronous is the Context parameter and the Poll<T> return value. Poll is an enum of two states: Ready(T) and Pending. If, during a call to poll, the future completes its work, then it simply returns Ready(result). On the other hand, if it needs to block for any reason, it returns Pending. However—and this is the most important part—before it returns Pending, it must somehow arrange for the Context parameter to be notified when it is ready to be polled again. This notification is done through a cloneable Waker struct supplied by the Context parameter; the Waker has a wake method which, when called, notifies the caller of poll that it should call poll again.

This point is very brief, so it's worth reiterating: it is the future's responsibility to arrange for the executor to be notified (via waker.wake()) when the future is ready to be polled again. Maybe there's an i/o runtime setup that awakens wakers after an epoll returns, or there's a thread waiting on a condition variable. What matters is that, somehow, the future arranges for this to happen when it thinks it can make more progress.

But what does waking it actually do? Well, the provider of the Context—the executor that's actually calling poll at a base level—has independently arranged to call poll again at some point after the Waker is triggered.

A simple executor

To see it in action, let's write a simple executor that runs a single future to completion. It won't do any kind of i/o handling, work queues, threads, or any of the other fancy features associated with production runtimes; it will simply run a single future. Note that this means that the function will block—when called, it won't return until the future completes. In this way, it serves as a bridge between synchronous code and async. We'll start with a simple signature:

fn run_future<F: Future>(future: F) -> F::Output {
    //...
}

In order for this function to run a future, it needs to repeatedly call poll, and return the result when poll returns ready. Recall that, when a future is polled and would block, it must arrange for the Waker to be signalled when the future can make more progress. Because the only thing we're doing is executing a single future—we're not creating our own task queue or event loop—we know that this signal must come from another thread if poll returns Pending without it having been signalled already. Our procedure can be therefore be simple: every time the future returns Pending, if wake hasn't already been called, put the thread to sleep until wake is called by another thread.

We therefore need some mechanism by which a thread can be put to sleep, awoken, and check for a signal. We're going to use a simple Condition Variable and Mutex over a boolean flag, and share access to that state through shared pointers:

use std::sync::{Condvar, Mutex};

#[derive(Debug, Default)]
struct Notifier {
    was_notified: Mutex<bool>,
    cv: Condvar,
}

There are two functional halves to this state we need to create: the notifier side, which we will use to create a Waker, and the waiter side, which will allow run_future to sleep until notified by that Waker. We'll start with the notifier side. In order to facilitate the creation of a Waker without going through the complex and unsafe process of doing it manually, we're going to use the cooked-waker helper crate.

use std::mem::replace;
use cooked_waker::WakeRef;

impl WakeRef for Notifier {
    fn wake_by_ref(&self) {
        // Lock the mutex and set was_notified to true. Additionally, if it
        // wasn't already true, notify our condition variable.
        let was_notified = {
            let mut lock = self.was_notified.lock().unwrap();
            mem::replace(&mut *lock, true)
        };

        if !was_notified {
            self.cv.notify_one()
        }
    }
}

The waiter side is even simpler: wait on the condition variable until the was_notified flag is true:

impl Notifier {
    fn wait(&self) {
        let mut was_notified = self.was_notified.lock().unwrap();

        while !*was_notified {
            was_notified = self.cv.wait(was_notified).unwrap();
        }

        *was_notified = false;
    }
}

This is actually everything we need to implement our run_future executor! We will be creating shared ownership to NotifyState, via Arc, which cooked-waker can automatically turn into a Waker struct, since NotifyState implements WakeRef.

use std::future::Future;
use std::task::Context;
use std::sync::Arc;
use std::pin::Pin;

use cooked_waker::IntoWaker;

fn run_future<F: Future>(mut future: F) -> F::Output {
    // In order to actually poll the future, it must be "pinned". Pinning is,
    // unfortunately, a moderately complex topic; in this case, all it means
    // is that, after we've "pinned" the future, we promise to never move it.
    // because the only thing we're doing is polling it, and then dropping it
    // at the end of this function, this promise is upheld.
    let mut future = unsafe { Pin::new_unchecked(&mut future) };

    // The notifier is where this function will listen for notifications
    let notifier = Arc::new(Notifier::default());

    // We're going to use a weak pointer to underpin our `Waker`, since once
    // the notifier is dropped, there's no need to send notifications at all.
    let waker = Arc::downgrade(&notifier).into_waker();
    let mut context = Context::from_waker(&waker);

    // Now we can poll the future! Repeatedly call `poll` until it's done.
    loop {
        match future.as_mut().poll(&mut context) {
            std::task::Poll::Ready(result) => break Some(result),

            // If the future is pending, it has (as we discussed earlier
            // arranged for `waker.wake()` to be called when it's ready to be
            // polled again.
            std::task::Poll::Pending => waiter.wait(),
        }
    }
}

That's the entire thing! There are certainly optimization opportunities here- for instance, we could use an AtomicBool instead of Mutex<bool>, and we could track the number of existing wakers such that if all of them are dropped, we know it's not possible to make further progress. However, this executor will correctly and reasonably performantly run any future we throw at it.

Seeing it in action

To close out, let's take a look at this executor running an async block. In the next part, we'll take a deeper look at actually creating and resolving futures, so for now we'll just use simple async blocks that resolve immediately.

fn main() {
    let result = run_future(async {
        1 + 2
    });

    assert_eq!(result, 3);
}

Notice how we can use the async block directly there as the argument to run_future. This is because, in the same way that lambdas create an anonymous Fn type, async blocks create an anonymous Future type.

Wrapping up

In this part, we addressed the very bottom of the async stack: the executor. This forms the primary bridge between ordinary, sequential rust code and its concurrent async counterpart. This means that we can run any ordinary rust Future. In future parts, we'll dive into more detail into about how the future itself works, and show how several futures can be composed together into one, allowing our simple executor to run arbitrarily complex async workloads.