[Rust-async-book]--5--The Stream

2020-03-09  本文已影响0人  buddyCoder

The Stream trait is similar to Future but can yield multiple values before completing, similar to the Iterator trait from the standard library:
Stream trait 类似于 Future,但在完成之前可以产生多个值,类似于标准库中的迭代器特征:

trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Retuns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

One common example of a Stream is the Receiver for the channel type from the futures crate. It will yield Some(val) every time a value is sent from the Sender end, and will yield None once the Sender has been dropped and all pending messages have been received:
Stream 的一个常见示例是futures crate中channel 类型的接收器。每次从发送方端发送值时,它都会产生Some(val),并且在发送方被删除并且接收到所有挂起的消息后,它将不产生任何值:

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

Iteration and Concurrency

Similar to synchronous Iterators, there are many different ways to iterate over and process the values in a Stream. There are combinator-style methods such as map, filter, and fold, and their early-exit-on-error cousins try_map, try_filter, and try_fold.
与同步迭代器类似,有许多不同的方法来迭代和处理Stream中的值。有一些组合样式的方法,如map、filter和fold,它们的早期退出错误类似于try_map、try_filter和try_fold。

Unfortunately, for loops are not usable with Streams, but for imperative-style code, while let and the next/try_next functions can be used:
不幸的是,for循环不能用于流,而是用于命令式代码,而let和next/try_next函数可以用于:

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

However, if we're just processing one element at a time, we're potentially leaving behind opportunity for concurrency, which is, after all, why we're writing async code in the first place. To process multiple items from a stream concurrently, use the for_each_concurrent and try_for_each_concurrent methods:
然而,如果我们一次只处理一个元素,那么我们就有可能留下并发的机会,这毕竟是我们首先编写异步代码的原因。要同时处理流中的多个项,请使用for-each-concurrent和try-for-each-concurrent方法:

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

上一篇 下一篇

猜你喜欢

热点阅读