Rust修行之Future篇-part3

2020-02-28  本文已影响0人  黑天鹅学院

本文翻译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 3 - The reactor

介绍

在这一篇文章中,我们将深入reactor内部的实现原理。在前面的文章中,我们多次使用reactor来执行Future,但是我们并不关心内部的实现。

Reactor与Loop

简单的说,reactor就是一个Loop,为了解释这个概念,可以参考一个老套的例子。

你向心仪的女孩发了封邮件,邀请她陪你一起看电影,发完邮件后,你肯定会十分忐忑,会不停的查邮箱,一遍又一遍的查,直到得到回复。

rust的reactor的运行原理类似于这个过程。将Future提交给reactor之后,reactor将不断的检查该Future,直到这个Future运行结束,或者是出现错误。reactor执行Future是通过调用poll函数来完成的,每一个Future都需要实现该函数,具体来说,就是返回一个Poll<T,E>结构体。事实上,reactor并不会无止境的调用poll函数,我们用一个例子来进行分析。

从零开始实现Future

为了认识reactor,我们从零实现一个Future,也就是手动实现Futuretrait。我们实现的Future功能很简单,在超时之后返回。

我们的定义WaitForIt如下:


#[derive(Debug)]

struct WaitForIt {

    message: String,

    until: DateTime<Utc>,

    polls: u64,

}

这个结构体保存有超时时间,一个用户自定义的字符串,以及已经被调用的次数。我们实现的new函数如下:


impl WaitForIt {

    pub fn new(message: String, delay: Duration) -> WaitForIt {

        WaitForIt {

            polls: 0,

            message: message,

            until: Utc::now() + delay,

        }

    }

}

这个new函数将初始化一个WaitForIt实例。

现在,我们开始实现Futuretrait,要做的事情也就是实现poll函数。


impl Future for WaitForIt {

    type Item = String;

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        let now = Utc::now();

        if self.until < now {

            Ok(Async::Ready(

                format!("{} after {} polls!", self.message, self.polls),

            ))

        } else {

            self.polls += 1;

            println!("not ready yet --> {:?}", self);

            Ok(Async::NotReady)

        }

    }

}

来看这几行:


type Item = String;

type Error = Box<Error>;

在rust里面,这样的类型被叫做关联类型, 意思就是,Future在将来完成时返回的值(或者错误)。在我们的例子中,WaitForIt最终返回一个String,或者是Box<Error>

看poll的函数的定义:


fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

在这个定义中,Self::ItemSelf::Error是两个关联类型的占位符,函数的定义与下面等价:


fn poll(&mut self) -> Poll<String, Box<Error>>

逻辑部分如下:


let now = Utc::now();

if self.until < now {

  // Tell reactor we are ready!

} else {

  // Tell reactor we are not ready! Come back later!

}

在poll函数中,我们怎么告诉reactor当前Future的执行状态呢,换句话说,reactor怎么直到这个Future已经完成了呢?方法很简单,我们通过Ok枚举携带Async::NotReady表征Future未完成,通过Ok枚举携带Async::Ready表征Future已完成。

poll函数改造如下:


impl Future for WaitForIt {

    type Item = String;

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        let now = Utc::now();

        if self.until < now {

            Ok(Async::Ready(

                format!("{} after {} polls!", self.message, self.polls),

            ))

        } else {

            self.polls += 1;

            println!("not ready yet --> {:?}", self);

            Ok(Async::NotReady)

        }

    }

}

我们在main函数中创建reactor来执行Future。


fn main() {

    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));

    println!("wfi_1 == {:?}", wfi_1);

    let ret = reactor.run(wfi_1).unwrap();

    println!("ret == {:?}", ret);

}

我们预期Future在1秒钟之后完成,运行结果如下:


Running `target/debug/tst_fut_create`

wfi_1 == WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 0 }

not ready yet --> WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 1 }

运行结果貌似不符合预期,仅运行了一次就停住了,但是并没有产生额外的CPU消耗,这是为什么呢?

fcpu

在rust中,一个Future poll函数提交给reactor后,视为这个Future停放在了这个reactor中,而reactor并不会再次调用这个poll函数,除非显式的告知需要被再次调用。在我们的例子中,reactor会立即调用WaitForIt中的poll函数,但是返回值是Async::NotReady,所以这个poll函数将会被停放在这个reactor中。如果没有相应的机制告诉reactor解除停放,那么poll函数将永远不会被再次调用。在这个过程中,reactor处于空闲状态,并不会额外消耗CPU。由于没有去直接查询运行是否完成的状态,所以这种方式的效率很高。在上面的邮件例子中,我们可以让邮箱在收到回复之后通知我们,这样就没有必要不断的去查邮箱了。

另一个更有意义的例子是网络收包过程,在不确定报文什么时候到达的情况下,我们可以阻塞线程等待报文到来,也可以在等待的过程中做其他的事情。

解除停放

有些时候,我们需要解除poll函数的停放,应该怎样修改WaitForIt的实现呢?有很多外部事件可以用来解除停放,比如键盘事件或者网络报文到达,在我们的例子中,我们需要手动触发。


futures::task::current().notify();

Future修改如下:


impl Future for WaitForIt {

    type Item = String;

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        let now = Utc::now();

        if self.until < now {

            Ok(Async::Ready(

                format!("{} after {} polls!", self.message, self.polls),

            ))

        } else {

            self.polls += 1;

            println!("not ready yet --> {:?}", self);

            futures::task::current().notify();

            Ok(Async::NotReady)

        }

    }

}

现在可以可以看到,Future不会停下来。

frun

代码运行结束后,poll函数在1秒内被调用了超过50k次。这是严重的资源浪费,所以,应该仅在事件明确发生的时候,才应该解除停放。

fcpu1

到目前为止,我们的loop是单线程的,如果有需要,可以使用多线程来运行Future。

Joining

reactor有一个有用的特性是可以并行运行多个Future,通过这种方式,我们可以更加高效的利用单线程loop,如果一个Future被停放了,另一个Future将获得执行机会。

我们复用WaitForIt,定义两个Future,然后并发的执行这两个Future:


let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));

println!("wfi_1 == {:?}", wfi_1);

let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));

println!("wfi_2 == {:?}", wfi_2);

我们通过futures::future::join_all来并发执行Future,join_all的输入是一个Future迭代器,我们先创建一个vector:


let v = vec![wfi_1, wfi_2];

然后创建联合:


let sel = join_all(v);

完整的代码如下:


fn main() {

    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));

    println!("wfi_1 == {:?}", wfi_1);

    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));

    println!("wfi_2 == {:?}", wfi_2);

    let v = vec![wfi_1, wfi_2];

    let sel = join_all(v);

    let ret = reactor.run(sel).unwrap();

    println!("ret == {:?}", ret);

}

运行结果如下:

frun1

关键点是两个请求是交错的,第一个Future被调用后,第二个Future被调用,然后是第一个,接着是第二个,依此类推,直到两个Future最终完成。

Select

futuretrait有很多辅助性的函数,除了join_all之外,还有一个是selectselect函数运行两个Future,返回第一个完成的Future。这种方法在实现超时时很有用,我们举一个例子:


fn main() {

    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));

    println!("wfi_1 == {:?}", wfi_1);

    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(2));

    println!("wfi_2 == {:?}", wfi_2);

    let v = vec![wfi_1, wfi_2];

    let sel = select_all(v);

    let ret = reactor.run(sel).unwrap();

    println!("ret == {:?}", ret);

}

尾声

在下一篇文章中,我们将介绍更加有实际意义的future,不消耗额外的CPU资源,同时更加符合reactor的要求。

上一篇下一篇

猜你喜欢

热点阅读