A tour of Rust Futures, part 1
September 28, 2020 -There's been a lot of discussion lately about the future of Rust's async runtimes— will we eventually have a single, standard runtime? Will we create some kind of global runtime registrar? How do you combine libraries that depend on different runtimes? It's my hope that these questions become not answered, but obsoleted entirely. One of the beautiful things about the design of rust's futures is that they're totally agnostic to the underlying runtime. This means that, in principle, once you have a future, you have a handle to a unit of work that can run regardless of the underlying runtime.
To demonstrate this, over the next few articles, I'm going to create a very tiny async runtime. In doing so, I'll walk through interface of Future
. Once our runtime is working, I'll create some basic async utilities and run them in our runtime. My hope is to show that Rust's modular approach to async makes it much less complicated than it often seems at first, and show the advantages of moving towards a runtime-agnostic futures ecosystem.
The Future
trait
To begin, let's go over the Future
trait. It's actually very simple; it just a single method and associated type. This type (and the contract it defines) is essentially the entire async interface for rust; everything else is built on top of it.
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
To some developers, especially those coming from other async ecosystems like Javascript, Python, or C#, this interface looks a bit scary. The whole point of an async abstraction is supposed to be to get away from a poll-driven model and instead let the runtime notify us when things are ready, such as through callbacks or await
. However, this interface has more in common with those than it appears at first.
The contract
A future is an object that refers to some unit of work that will be run asynchronously. When this work is complete, it is returned as an Output
. This work is driven in the foreground by the poll
method. This means that, unlike other async models where the actual logic is submitted to a global work queue, in rust the work is simply performed directly by the owner of the future, in poll
, like a normal function call.
The key to making this asynchronous is the Context
parameter and the Poll<T>
return value. Poll
is an enum of two states: Ready(T)
and Pending
. If, during a call to poll, the future completes its work, then it simply returns Ready(result)
. On the other hand, if it needs to block for any reason, it returns Pending
. However—and this is the most important part—before it returns Pending
, it must somehow arrange for the Context
parameter to be notified when it is ready to be polled again. This notification is done through a cloneable Waker
struct supplied by the Context
parameter; the Waker
has a wake
method which, when called, notifies the caller of poll
that it should call poll
again.
This point is very brief, so it's worth reiterating: it is the future's responsibility to arrange for the executor to be notified (via waker.wake()
) when the future is ready to be polled again. Maybe there's an i/o runtime setup that awakens wakers after an epoll
returns, or there's a thread waiting on a condition variable. What matters is that, somehow, the future arranges for this to happen when it thinks it can make more progress.
But what does waking it actually do? Well, the provider of the Context
—the executor that's actually calling poll at a base level—has independently arranged to call poll
again at some point after the Waker
is triggered.
A simple executor
To see it in action, let's write a simple executor that runs a single future to completion. It won't do any kind of i/o handling, work queues, threads, or any of the other fancy features associated with production runtimes; it will simply run a single future. Note that this means that the function will block—when called, it won't return until the future completes. In this way, it serves as a bridge between synchronous code and async
. We'll start with a simple signature:
fn run_future<F: Future>(future: F) -> F::Output {
//...
}
In order for this function to run a future, it needs to repeatedly call poll
, and return the result when poll
returns ready. Recall that, when a future is polled and would block, it must arrange for the Waker
to be signalled when the future can make more progress. Because the only thing we're doing is executing a single future—we're not creating our own task queue or event loop—we know that this signal must come from another thread if poll
returns Pending
without it having been signalled already. Our procedure can be therefore be simple: every time the future returns Pending
, if wake
hasn't already been called, put the thread to sleep until wake
is called by another thread.
We therefore need some mechanism by which a thread can be put to sleep, awoken, and check for a signal. We're going to use a simple Condition Variable and Mutex over a boolean flag, and share access to that state through shared pointers:
use std::sync::{Condvar, Mutex};
#[derive(Debug, Default)]
struct Notifier {
was_notified: Mutex<bool>,
cv: Condvar,
}
There are two functional halves to this state we need to create: the notifier side, which we will use to create a Waker
, and the waiter side, which will allow run_future
to sleep until notified by that Waker
. We'll start with the notifier side. In order to facilitate the creation of a Waker
without going through the complex and unsafe process of doing it manually, we're going to use the cooked-waker
helper crate.
use std::mem::replace;
use cooked_waker::WakeRef;
impl WakeRef for Notifier {
fn wake_by_ref(&self) {
// Lock the mutex and set was_notified to true. Additionally, if it
// wasn't already true, notify our condition variable.
let was_notified = {
let mut lock = self.was_notified.lock().unwrap();
mem::replace(&mut *lock, true)
};
if !was_notified {
self.cv.notify_one()
}
}
}
The waiter side is even simpler: wait on the condition variable until the
was_notified
flag is true:
impl Notifier {
fn wait(&self) {
let mut was_notified = self.was_notified.lock().unwrap();
while !*was_notified {
was_notified = self.cv.wait(was_notified).unwrap();
}
*was_notified = false;
}
}
This is actually everything we need to implement our run_future
executor! We will be creating shared ownership to NotifyState
, via Arc
, which cooked-waker
can automatically turn into a Waker
struct, since NotifyState
implements WakeRef
.
use std::future::Future;
use std::task::Context;
use std::sync::Arc;
use std::pin::Pin;
use cooked_waker::IntoWaker;
fn run_future<F: Future>(mut future: F) -> F::Output {
// In order to actually poll the future, it must be "pinned". Pinning is,
// unfortunately, a moderately complex topic; in this case, all it means
// is that, after we've "pinned" the future, we promise to never move it.
// because the only thing we're doing is polling it, and then dropping it
// at the end of this function, this promise is upheld.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
// The notifier is where this function will listen for notifications
let notifier = Arc::new(Notifier::default());
// We're going to use a weak pointer to underpin our `Waker`, since once
// the notifier is dropped, there's no need to send notifications at all.
let waker = Arc::downgrade(¬ifier).into_waker();
let mut context = Context::from_waker(&waker);
// Now we can poll the future! Repeatedly call `poll` until it's done.
loop {
match future.as_mut().poll(&mut context) {
std::task::Poll::Ready(result) => break Some(result),
// If the future is pending, it has (as we discussed earlier
// arranged for `waker.wake()` to be called when it's ready to be
// polled again.
std::task::Poll::Pending => waiter.wait(),
}
}
}
That's the entire thing! There are certainly optimization opportunities here- for instance, we could use an AtomicBool
instead of Mutex<bool>
, and we could track the number of existing wakers such that if all of them are dropped, we know it's not possible to make further progress. However, this executor will correctly and reasonably performantly run any future we throw at it.
Seeing it in action
To close out, let's take a look at this executor running an async
block. In the next part, we'll take a deeper look at actually creating and resolving futures, so for now we'll just use simple async
blocks that resolve immediately.
fn main() {
let result = run_future(async {
1 + 2
});
assert_eq!(result, 3);
}
Notice how we can use the async
block directly there as the argument to run_future
. This is because, in the same way that lambdas create an anonymous Fn
type, async
blocks create an anonymous Future
type.
Wrapping up
In this part, we addressed the very bottom of the async stack: the executor. This forms the primary bridge between ordinary, sequential rust code and its concurrent async counterpart. This means that we can run any ordinary rust Future
. In future parts, we'll dive into more detail into about how the future itself works, and show how several futures can be composed together into one, allowing our simple executor to run arbitrarily complex async workloads.