Using future Streams in Rust with Results

Using future Streams in Rust with Results

Have you ever wanted to do something in parallel in Rust? In a recent project, I wanted to fetch a list of things from a network server then for each thing make another request to the server. Well some Googling brought up this reddit comment. This is a story of the journey from that comment to the diff and the problems I hit along the way.

I’m going to talk about using the futures crate, specifically streams which represent a series of values asynchronously. Things went well until Result came into the picture

Syncronous maps and Result

Let’s say we have a fallible (I’m not going to explain Result here) function

fn double_sync(x: usize) -> Result<usize, String> {
    if x == 3 {
        Err("I don't like three".to_owned())
    } else {
        Ok(x * 2)
    }
}

and you want to apply it to a range of usize’s, you can use iterators and map then collect them into a Vec<Result<T, E>>. Let’s try

let results: Vec<_> = (0..5).map(double_sync).collect();
println!("{results:?}");

gives

[Ok(0), Ok(2), Ok(4), Err("I don't like three"), Ok(8)]

A few things to explain:

  • collect needs a hint of what collection to build. You can do what I’ve done or use the turbofish syntax .collect::<Vec<_>. The same thing happens in either case, it’s just where you want to put things
    • One of my favorite crates, Itertools has collect_vec which is collect for Vectors (for the impatient, look at the next method)
  • The _ tells rust, “Hey whatever map returns, I want a collection of that thing” and in our case it’s a Result<usize, String>
  • 0..5 is a Range which does not include 5.

So this is good. I get all the results back, but I have a mix of Oks and Errs. What if I wanted to turn things from a Vec<Result<T, E>> into a Result<Vec<T>, E>. So either Ok([ 0, 2, 4]) or Err("blah"). Well because Result implements iter, collect can magic things around for us. And if you’re reading this I decided not to come back and explain said magic. Oh too bad.

let results: Result<Vec<_>, _> = (0..5).map(double_sync).collect();
println!("{results:?}");
Err("I don't like three")

hmm… so all the Oks are discarded, but you can keep the errors if you want to. Fine what about all successful.

let results: Result<Vec<_>, _> = (4..=6).map(double_sync).collect();
println!("{results:?}");
Ok([8, 10, 12])

What about async?

I’m going to start of with a function that can’t fail

async fn double(x: usize) -> usize {
	x * 2
}

How do I map that across my input asynchronously. Well you can turn the iterator into a Stream with ‘futures::stream::iter’.

Something that future-Ox has just told me will be important, a Stream is just like an iterator but instead of next() it has a poll_next function that returns a futures::task::Poll enum that the async runtime can come back to. Read Understanding Rust futures by going way too deep for why this matters.

Let’s do this

let results: Vec<usize> = futures::stream::iter(0..5).map(double_infallible).collect();

Go! Rust! Go… err what

error[E0599]: `futures::stream::Iter<std::ops::Range<{integer}>>` is not an iterator
  --> src/main.rs:33:10
   |
33 |         .map(double_infallible)
   |          ^^^ `futures::stream::Iter<std::ops::Range<{integer}>>` is not an iterator
   |
  ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/iter.rs:9:1
   |
9  | pub struct Iter<I> {
   | ------------------ doesn't satisfy `_: Iterator`
   |
   = note: the following trait bounds were not satisfied:
           `futures::stream::Iter<std::ops::Range<{integer}>>: Iterator`
           which is required by `&mut futures::stream::Iter<std::ops::Range<{integer}>>: Iterator`
   = help: items from traits can only be used if the trait is in scope
help: the following trait is implemented but not in scope; perhaps add a `use` for it:
   |
4  | use futures::StreamExt;

futures:stream::Iter is not an iterator. Wat! Oh wait hold on, we need to bring in the Extension Trait StreamExt which actually has the map function. And rustc kindle tells us to use futures::StreamExt. Here we go again

error[E0308]: mismatched types
  --> src/main.rs:32:31
   |
32 |       let results: Vec<usize> = futures::stream::iter(0..5)
   |  __________________----------___^
   | |                  |
   | |                  expected due to this
33 | |         .map(double_infallible)
34 | |         .collect();
   | |__________________^ expected struct `Vec`, found struct `Collect`
   |
   = note: expected struct `Vec<usize>`
              found struct `Collect<futures::stream::Map<futures::stream::Iter<std::ops::Range<usize>>, fn(usize) -> impl futures::Future<Output = usize> {double_infallible}>, _>`

What now? So, collect returns a Collect<St, C>. Let’s look at the source for Collect. We have

let results: Vec<usize> = futures::stream::iter(0..5)
        .map(double_infallible)
        .collect()
        .await;

Holy crap!

error[E0277]: the trait bound `Vec<usize>: Extend<impl futures::Future<Output = usize>>` is not satisfied
  --> src/main.rs:35:9
   |
35 |         .await;
   |         ^^^^^^ the trait `Extend<impl futures::Future<Output = usize>>` is not implemented for `Vec<usize>`
   |
   = help: the following other types implement trait `Extend<A>`:
             <Vec<T, A> as Extend<&'a T>>
             <Vec<T, A> as Extend<T>>
   = note: required for `Collect<futures::stream::Map<futures::stream::Iter<std::ops::Range<usize>>, ...>, ...>` to implement `futures::Future`
   = note: the full type name has been written to '/playground/target/debug/deps/playground-2814c18f3846befb.long-type-4383195030087971247.txt'

error[E0277]: the trait bound `Vec<usize>: Extend<impl futures::Future<Output = usize>>` is not satisfied
   --> src/main.rs:32:31
    |
32  |       let results: Vec<usize> = futures::stream::iter(0..5)
    |  _______________________________^
33  | |         .map(double_infallible)
    | |_______________________________^ the trait `Extend<impl futures::Future<Output = usize>>` is not implemented for `Vec<usize>`
34  |           .collect()
    |            ------- required by a bound introduced by this call
    |
    = help: the following other types implement trait `Extend<A>`:
              <Vec<T, A> as Extend<&'a T>>
              <Vec<T, A> as Extend<T>>
note: required by a bound in `futures::StreamExt::collect`
   --> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/mod.rs:516:29
    |
516 |     fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
    |                             ^^^^^^^^^^^^^^^^^^ required by this bound in `futures::StreamExt::collect`

For more information about this error, try `rustc --explain E0277`.

(ノಠ益ಠ)ノ彡┻━┻

Pull apart the Stream

Okay let’s go back. What does the map actually give us. Excuse me rustc how do you barf on this

let results: i32 = futures::stream::iter(0..5).map(double_infallible);

And without long module names

note: expected type `i32`
            found struct `Map<Iter<Range<usize>>, fn(usize) -> impl Future<Output = usize> {double_infallible}>`

Aha, so in Map where the Stream is a FusedStream and the function takes a usize and returns a Future. Ahhh right, so when we collect this, we’ll actuallly get collection of Future not a collection of usize. So back to my collect, let’s try with some inference

let results: Vec<_> = futures::stream::iter(0..5)
        .map(double_infallible)
        .collect()
        .await;

Yay! It compiles. Now from what we looked at with Map I betcha it’s a Vec of Future that produces a usize. Let’s println!("{results:?}")

error[E0277]: `impl futures::Future<Output = usize>` doesn't implement `Debug`
  --> src/main.rs:43:16
   |
43 |     println!("{results:?}");
   |                ^^^^^^^ `impl futures::Future<Output = usize>` cannot be formatted using `{:?}` because it doesn't implement `Debug`
   |
   = help: the trait `Debug` is not implemented for `impl futures::Future<Output = usize>`
   = help: the trait `Debug` is implemented for `Vec<T, A>`
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

Where the error is happening is unclear (to me), but I think that’s right. What does print_type_of say?

alloc::vec::Vec<playground::double_infallible::>

Heey, there we go. So how do I execute those futures? Let’s do it the ourselves

let mut values: Vec<usize> = Vec::with_capacity(results.len());
for f in results {
    values.push(f.await);
}
println!("{values:?}");

Whoop!!!!

[0, 2, 4, 6, 8]

┳━┳ ヽ(ಠل͜ಠ)ノ But surely, something can do this for us. Why yes, indeed

buffered and buffer_unordered

Both of these have a very important

The returned stream will be a stream of each future’s output

in the docs. Remember when I said a Stream has a poll_next function on it. Well looking at the implementation of Stream for Buffered, you can see a queue and a loop polling each item until they’re ready. BufferUnordered is similar but doesn’t care about the order.

Right now we know how to get the futures module to poll out Futures, let’s put it all together.

let results: Vec<_> = futures::stream::iter(0..5)
    .map(double_infallible)
    .buffered(10)
    .collect()
    .await;
println!("{results:?}");

Boom!

[0, 2, 4, 6, 8]

Bring back the Result

Okay, now back at the beginning, with Iterators, I wanted a Result<Vec<usize>, _>. As a reminder

let results: Result<Vec<_>, _> = (0..5).map(double_sync).collect();
println!("{results:?}");

So let’s stick async at the front of double_sync and rename it

async fn double(x: usize) -> Result<usize, String> {
     if x == 3 {
        Err("I don't like three".to_owned())
    } else {
        Ok(x * 2)
    }
}

And plug this into the stream

let results: Vec<_> = futures::stream::iter(0..5)
	.map(double)
	.buffered(10)
	.collect()
	.await;
println!("{results:?}");
[Ok(0), Ok(2), Ok(4), Err("I don't like three"), Ok(8)]

We’ve seen that before. Let’s make the type Result<Vec<_>, _> and then we can all go home

let results: Result<Vec<_>, _> = futures::stream::iter(0..5)
	.map(double)
	.buffered(10)
	.collect()
	.await;
println!("{results:?}");

Go, rustc, go!

error[E0277]: the trait bound `Result<Vec<_>, _>: Default` is not satisfied
  --> src/main.rs:75:9
   |
75 |         .await;
   |         ^^^^^^ the trait `Default` is not implemented for `Result<Vec<_>, _>`
   |
   = note: required for `Collect<Buffered<futures::stream::Map<futures::stream::Iter<std::ops::Range<usize>>, ...>>, ...>` to implement `futures::Future`

And another 63 lines of errors. Where’s that table, I’m going to flip it again 😡. Okay. Breathe. Look at the docs. Hey, what’s that in the futures::stream module

TryStreamExt

Adapters specific to Result-returning streams

Oh you sound ideal. And you have a try_collect which

This combinator will collect all successful results of this stream If an error happens then … the error will be returned.

Let’s “try” it (and don’t forget to use futures::TryStreamExt;)

let results: Result<Vec<_>, _> = futures::stream::iter(0..5)
	.map(double)
	.buffered(10)
	.try_collect()
	.await;
println!("{results:?}");
Err("I don't like three")

OH YEAH!!!! Change the range to 4..10 and we get

Ok([8, 10, 12, 14, 16, 18])

Right, I’m done, I’m going to bed. I hope this is both helpful and correct. Please let me know if it isn’t.

Published: February 04 2023

  • category:
  • tags:
blog comments powered by Disqus