- Prerequisites
- std::future::Future
- Converting new style Futures to old style Futures
- Converting old style Futures into new style Futures
- Fun with new style Futures
- Writing new style streams to work with old-style AsyncReaders
- Writing new style sinks to work with old-style AsyncWriters
- Spawning new style futures to run concurrently
- Conclusion
Following on from my last post, I thought I would look at async/await support in Rust.
The async/await support coming to Rust brings with it a much more ergonomic way to work with asynchronous computations. In this post I'll introduce std::future::Future
, and run through how to make use of them, and how to interoperate with the current ecosystem which is built around version 0.1 of the futures
package.
I'll refer to Futures from the 0.1 futures
package as "0.1 Futures" or "old style Futures", and the Future
trait exposed in the nightly standard library that is the backbone of the async/await syntax as "std Futures" or "new style Futures" (or something along those lines!).
All of the example code used below can be found here
Prerequisites
To use the new async/await syntax, you'll need to be using a relatively recent Rust nightly (as of November 26th 2018). An appropriate Cargo.toml
looks like this:
[package]
name = "My Package"
version = "0.1.0"
authors = ["You"]
edition = "2018"
[dependencies]
futures = "0.1.25"
# enable the async-await stuff using a feature flag:
tokio = { version = "0.1.13", features = ["async-await-preview"] }
# Only needs to be explicitly imported if you want to make use of
# the machinery to convert promises back and forth between 0.1 and 0.3
# (which is built into the provided await! macro only otherwise):
tokio-async-await = "0.1.4"
For most normal things you'll just need to use tokio
with the async-await-preview
feature flag, though the tokio-async-await
package (parts of which are reexported by tokio
with said feature enabled) provides a couple of useful bits on its own, including compatibility shims to convert between old and new style Futures.
In the root of your crate you'll need to opt in to some features as well to get the new syntax. The first ones are a must, but I use the others in my example code for a couple of things:
// enable the await! macro, async support, and the new std::Futures api.
#![feature(await_macro, async_await, futures_api)]
// only needed if we want to manually write a method to go forward from 0.1 to 0.3 future,
// or manually implement a std future (it provides Pin and Unpin):
#![feature(pin)]
// only needed to manually implement a std future:
#![feature(arbitrary_self_types)]
The await_macro
feature defines an await!
macro that is the main way to wait for Futures to resolve. Tokio exports it's own shim on top of this which works in the same way but is also compatible with 0.1 Futures. This isn't necessary if you avoid ever needing to call await!
on an old Future, but is useful for interop. We can bring it into scope across our entire crate with:
#[macro_use]
extern crate tokio;
Or bring it into scope in only the modules we want using the standard import syntax in each module we want it:
use tokio::await;
We'll also want to import the tokio::prelude
, which is beefed up with some additional async/await helper functions when the tokio-async-await
feature flag is used:
use tokio::prelude::*;
In total, we gain access to these additional methods (I hope I haven't missed any):
Stream.next()
Sink.send_async(value)
AsyncRead.read_async(buf)
AsyncRead.read_exact_async(buf)
AsyncWrite.write_async(buf)
AsyncWrite.write_all_async(buf)
AsyncWrite.flush_async()
tokio::run_async(new_future)
tokio::spawn_async(new_future)
I'll cover almost all of these in the following examples.
std::future::Future
With that out of the way, we can make new style futures by using the async
keyword.
This function returns a Future<Output=&'static str>
when called:
async fn hello_world() -> &'static str {
"Hello World"
}
And this block returns the same:
let hello_world_fut = async { "Hello World" };
Worth noting straight away is that new Futures only have a single Output
, whereas old style Futures have separate Item
and Error
types for handling possible failure. If you want to handle errors with new style Futures, you just return a Result
as that output, which plays nice with things like ?
and so on.
Inside an async
function, closure or block, you can use the await!
macro to wait for other futures to resolve. The next example counts to three, one per second:
// use Delay from the tokio::timer module to sleep the task:
async fn sleep(n: u64) {
use tokio::timer::Delay;
use std::time::{Duration, Instant};
await!(Delay::new(Instant::now() + Duration::from_secs(n))).unwrap();
};
// sleep a second before each line is printed:
tokio::run_async(async {
await!(sleep(1));
println!("One");
await!(sleep(1));
println!("Two");
await!(sleep(1));
println!("Three");
});
await
is non-blocking, so the Runtime is happy to go and work on progressing other Futures (if there are any) while waiting on any given await
call.
I'll show more examples as we go, but hopefully you have already been given a feel for how ergonomically superior async/await syntax is to chaining futures together.
Converting new style Futures to old style Futures
To make use of new style Futures alongside the various combinators and such exposed on old style Futures, you'll need to convert them. Although not explicitly exposed, you can make use of the machinery in the tokio-async-await
crate to make quick work of it (This is correct as of tokio-async-await
0.1.4 but may change at any time):
use std::future::Future as NewFuture;
use futures::Future as OldFuture;
// converts from a new style Future to an old style one:
fn backward<I,E>(f: impl NewFuture<Output=Result<I,E>>) -> impl OldFuture<Item=I, Error=E> {
use tokio_async_await::compat::backward;
backward::Compat::new(f)
};
The only caveat here is that the new style future needs to output a Result
, so that we can map to the Item
and Error
associated types needed for old style futures. With this in hand, we can use a new Future (made by an async thing) as if it was an old one:
// Map our hello_world() future to return a Result<&str,()> rather
// than just &'str, so that we can convert it to an old style one:
let hello_world_result = async {
let s = await!(hello_world());
Ok::<_,()>(s)
};
// use the above function to convert back:
let hello_world_old = backward(hello_world_result);
// We can then run it like any old style future, allowing to to use any
// of the machinery currently available for 0.1 style futures:
tokio::run(
hello_world_old.map(|val| println!("Running as 0.1 future: {}", val))
);
The main use case for this that I can see is making use of combinators like select
and join
from the land of old Futures, rather than having to reimplement them to work alongside new Futures.
Converting old style Futures into new style Futures
The easiest way to convert an old style Future into a new one is simply by using the await!
macro that Tokio gives us (rather than std::await
), since it will convert old style Futures for us. We've already seen this above when using the tokio::timer::Delay
future in a new style async
block.
If we want, we can use the same approach to write ourselves a function to manually convert them for us:
use std::future::Future as NewFuture;
use futures::Future as OldFuture;
// converts from an old style Future to a new style one:
fn forward<I,E>(f: impl OldFuture<Item=I, Error=E> + Unpin) -> impl NewFuture<Output=Result<I,E>> {
use tokio_async_await::compat::forward::IntoAwaitable;
f.into_awaitable()
}
We can test that it works by using the std::await
macro on a converted Future instead of the Tokio version:
tokio::run_async(async {
// Create some old style Future:
let old_future = futures::future::ok::<_,()>("Awaiting a manually converted 0.1 future!");
// Convert to a new style one:
let new_future = forward(old_future);
// `await` the result and print it:
println!("{}", std::await!(new_future).unwrap());
});
I'm not really sure if this is super useful, but it's nice to know that you can easily go back and forth between new and old style Futures.
Fun with new style Futures
Manually implementing a new style Future on top of Tokio poll_x methods
In my last post, I manually implemented an old style Future
which reads one byte at a time from some AsyncRead
type. Let's do the same again here, but with new style Futures.
Implementing a new style Future isn't significantly more difficult; the main challenges are understanding Pin
, and converting between the Async
type Tokio uses and the std::task::Poll
type that new Futures hand back.
// expose the std types we need to work with:
use std::task::{ Poll as StdPoll, LocalWaker};
use std::future::Future as StdFuture;
use std::pin::{ Pin, Unpin };
struct ByteFuture<R>(R);
impl <R: AsyncRead + Unpin> StdFuture for ByteFuture<R> {
// std futures have just one output, but it can be a Result to signal errors:
type Output = Result<u8, tokio::io::Error>;
// poll takes a Pin thing now. By requiring that Self is Unpin (which it is
// so long as the reader thing it contains is), we are allowed mutable access
// to it (which we need to poll the AsyncReader):
fn poll(mut self: Pin<&mut Self>, _lw: &LocalWaker) -> StdPoll<Self::Output> {
let mut buf = [0;1];
// we need to convert from the Async type returned from the poll_x method
// to the return type expected from the std Future (it's either Ready
// or Pending now, since there is no separate Error item any more):
match self.0.poll_read(&mut buf) {
Ok(Async::Ready(_num_bytes_read)) => StdPoll::Ready(Ok(buf[0])),
Ok(Async::NotReady) => StdPoll::Pending,
Err(e) => StdPoll::Ready(Err(e))
}
}
}
let byte_future = ByteFuture(tokio::io::stdin());
byte_future
is now a new style Future that yields one byte of input from stdin
.
Pin
is a type that is used to guarantee that we can't move Self
. This is necessary because new style futures can contain self referential variables (which allows references working across await
points). If Self
could be moved, those references would be invalidated.
Unpin
is a marker trait that means it is safe to move a thing. If this is true, it is safe to mutably access the thing (which lets you move it, eg with mem::replace
). We need mutable access, so we demand that the AsyncRead
type we accept is also Unpin
. Almost everything implements Unpin
. I imagine that async
blocks with references that are used across await
points would not be Unpin
, since they are no longer safe to move around.
We can achieve exactly the same and avoid implementing our own Future
by making use of the read_async
method provided on AsyncRead
:
let byte_future2 = async {
let mut buf = [0;1];
let mut stdin = tokio::io::stdin();
await!(stdin.read_async(&mut buf))?;
Ok::<u8,tokio::io::Error>(buf[0])
};
This returns a Future<Output=Result<u8,tokio::io::Error>>
just like our Future implementation above. We also use ?
to bail out early if read_async
returns with an error. It's great to see that async/await plays well with existing operators and control flow like this.
Writing new style streams to work with old-style AsyncReaders
It's probably worth noting that there is (currently at least) no such thing as a new style Stream or Sink. Instead, helper methods are added to each which return new style Futures that we can await
in order to send and receive input from them.
The simplest way to stream bytes from an AsyncRead
type is probably to just use the .read_async
method again, but in a while loop (this would be easy to add better buffering to if we wanted):
let stream_bytes = async {
let mut buf = [0;1];
let mut stdin = tokio::io::stdin();
// While read_async returns a number of bytes
// read and not an error:
while let Ok(n) = await!(stdin.read_async(&mut buf)) {
// bail if we've read everything:
if n == 0 { break };
// stop if the byte we read is a newline:
if buf[0] == b'\n' { break };
// print each byte we read up to a newline/error:
println!("Streamed: {}", buf[0] as char);
}
};
This Future will stream bytes one at a time from stdin
until it hits a newline, or stdin is closed, printing each byte as a char
each time one comes through.
If we already have a Stream
, we can use the next()
method which has been added to it to pluck items out of it in a standard while loop:
let mut byte_stream = stream::iter_ok::<_,()>("from an old style stream\n".chars());
let stream_bytes2 = async move {
// Some(Output), where Output is Result<char, ()>. We bail
// if the stream ends (None) or the result indicates an error:
while let Some(Ok(c)) = await!(byte_stream.next()) {
println!("Streamed 2: {}", c);
}
};
Using while
loops in this way replaces using stream combinators like for_each
, as well as error handling combinators like map_err
; we can just use normal Rust syntax to deal with everything. Lovely stuff!
Writing new style sinks to work with old-style AsyncWriters
If you don't yet have a Sink
, Tokio adds write_async
, write_all_async
and flush_async
methods to AsyncWriter
types, so it becomes easy to put bytes into them without any extra machinery:
let sink_message = async {
let message = "writing to stdout, one byte at a time\n";
let mut stdout = tokio::io::stdout();
for byte in message.bytes() {
let buf = &[byte];
if let Err(e) = await!(stdout.write_all_async(buf)) {
println!("Error writing out: {:?}", e);
}
}
await!(stdout.flush_async()).unwrap();
}
This time, we loop over some bytes we want to output, and send each one to stdout
using write_all_async
. We could probably improve the performance here by writing more bytes at a time. We need to remember to flush_async
at the end to ensure bytes have been written out completely.
If you already have a Sink
, you get a new send_async
method on it, which makes it easy to send things into it without the messy ownership transferring stuff you have to do with send
. In this example I forward bytes from a Stream to a Sink:
// Create a quick stream of bytes:
let mut byte_stream = stream::iter_ok::<_,()>("From a Stream to a Sink\n".bytes());
// Use `FramedWrite` to build ourselves a one-byte-at-a-time Sink to stdout:
let mut byte_sink = FramedWrite::new(tokio::io::stdout(), BytesCodec::new())
.with(|byte| {
// convert an incoming byte into the Bytes type expected by the codec:
Ok::<_, tokio::io::Error>([byte][..].into())
});
// Running this Future will lead to bytes being forwarded from Stream to Sink:
let forward_bytes = async move {
while let Some(Ok(byte)) = await!(byte_stream.next()) {
if let Err(e) = await!(byte_sink.send_async(byte)) {
println!("Error: {:?}", e);
}
}
}
Spawning new style futures to run concurrently
Just like tokio::spawn
did for old style Futures, tokio::spawn_async
allows spawning into the background of new style futures, allowing multiple jobs to run concurrently. Remember, we can always convert our new style Futures back into old style ones if we want to use other methods to spawn and run Futures.
Here, we concurrently write the same text to stdout
multiple times, leaving a good chance (but not a guarantee) that it'll be garbled together:
tokio::run_async(async {
async fn write_to_stdout() {
let mut stdout = tokio::io::stdout();
let message = "Concurrently Writing This Message";
for byte in message.bytes() {
let buf = &[byte];
await!(stdout.write_all_async(buf)).unwrap();
await!(stdout.flush_async()).unwrap();
}
}
// execute this future-returning fn multiple times
// concurrently:
tokio::spawn_async(write_to_stdout());
tokio::spawn_async(write_to_stdout());
tokio::spawn_async(write_to_stdout());
tokio::spawn_async(write_to_stdout());
tokio::spawn_async(write_to_stdout());
tokio::spawn_async(write_to_stdout());
});
Conclusion
I'm really very excited about async/await syntax; it makes writing and composing Futures much, much nicer. This post has discussed how to make use of all of this new stuff alongside the current Futures 0.1 ecosystem, so that you can start playing with and benefitting from it straight away.
You can find all of the example code here.