首页投稿(暂停使用,暂停投稿)今日看点

学习 Rust Futures - Executor and T

2017-10-05  本文已影响1383人  siddontang

在最开始学习 Rust futures 的时候,executor 和 task 是两个让我比较困惑的概念,这两个东西到底是啥,它们到底是如何使用的,我当时完全不清楚。等后来做完一些项目,才慢慢理解了。所以觉得有必要好好的记录一下。

介绍

Executor 可以认为是一个用来执行 future 的地方,我们可以在当前线程里面执行 future,也可以将 future 扔到一个 thread pool 里面去执行,也可以在 event loop 里面(例如 tokio-core)里面去执行。

而 Task 则可以认为是一种正在或者将会被执行的 future。通常,我们会将多个 future 组合成一个大的工作单元,然后会在 executor 上面 spawn 一个对应的 task。Executor 会负责当通知到来的时候,去 poll future,直到 future 全被执行结束。

整个流程可以简化为:

  1. 当一个 future 不是 ready 的时候,我们使用 task::current() 函数得到一个 task handle,并 block 住当前的 future。
  2. 将 task handle 加入到一个感兴趣的事件队列里面,如果相关事件触发了,则通过 task.notify() 通知对应的 executor。
  3. Executor 继续 poll future。

上面可能比较抽象,我们可以通过几个例子更深刻的了解相关的机制。

Future wait

当我们创建了一个 future 之后,可以使用 wait 函数,block 住当前的线程,强制等到 future 被执行,然后才会继续进行后面的操作。

Future wait 函数的实现如下:

fn wait(self) -> result::Result<Self::Item, Self::Error>
    where Self: Sized
{
    ::executor::spawn(self).wait_future()
}

可以看到,我们使用 executor::spawn 了一个 Spawn 对象,Spawn 对象表示的是一个 fused future 和 task, 这就意味着我们不能再将 future 跟其他的 future 去组合了,只能执行了。

wait_future 函数里面,我们会 block 住当前的线程,直到 Spawn 内部的 future 执行完毕,代码如下:

pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
    let unpark = Arc::new(ThreadUnpark::new(thread::current()));

    loop {
        match self.poll_future_notify(&unpark, 0)? {
            Async::NotReady => unpark.park(),
            Async::Ready(e) => return Ok(e),
        }
    }
}

首先我们会创建一个 ThreadUnpark 的 Notify 对象,然后传给 Spawn 的 poll_future_notify 去使用。当 future 变成 ready 的时候,我们会去调用 Notify 的 notify 函数去通知相关的 executor 继续去 poll 这个 future。

在 ThreadUnpark 里面,notify 实现如下:

impl Notify for ThreadUnpark {
    fn notify(&self, _unpark_id: usize) {
        self.ready.store(true, Ordering::SeqCst);
        self.thread.unpark()
    }
}

notify 函数里面,我们直接会调用 thread 的 unpark 函数,用来唤醒当前被 block 的线程。

Spawn 的 poll_future_notify 会尝试 poll 内部的 future,这个函数会接受一个 NotifyHandle 参数,后续任何的 task::current() 操作返回的 task handle 都会带上这个 NotifyHandle,这样我们通过 task.notify() 就能告诉 executor future 已经 ready 了。

如果 poll_future_notify 返回 NotReady,我们就需要靠 Notify 来通知了。在上面的例子中,返回 NotReady 之后,我们直接调用了 pack 函数,定义如下:

fn park(&self) {
    if !self.ready.swap(false, Ordering::SeqCst) {
        thread::park();
    }
}

park 里面,我们直接调用了 thread 的 park 函数,block 住了当前线程,这样当 future 已经 ready 之后,我们会调用 thread 的 unpark 函数唤醒被 block 的线程。

gRPC

上面是一个简单使用操作系统 thread 的 park/unpark 函数来处说明 Executor 和 Task 的例子,在 rust gRPC 里面,我们为了跟 gRPC 的 event loop 整合,也实现了相关的操作。

这里先介绍一下 gRPC 的相关概念,在 gRPC 里面,所有的事件都是通过 CompletionQueue ( 后面以 CQ 代替) 来驱动的,我们会不停的循环调用 CQ 的 next 函数,当有事件产生的时候,next 就会返回对应的事件,然后我们会通过这个事件里面的 tag 找到对应的上下文继续处理。

通常我们都是在 for 循环里面调用的 next 函数,其它线程如果想跟 CQ 发送消息,就需要通过 gRPC 里面的 alarm 机制,我们会先通过 grpc_alarm_create 创建一个 alarm,然后调用 grpc_alarm_cancel 就可以直接去通知到 CQ 了。当 next 返回对应的 alarm event 之后,我们就可以执行这个 alarm 相关的逻辑了。

当 CQ 线程调用到对应的 gRPC method 之后,我们可能需要在其他线程去处理相关的操作,这时候,就可以通过 executor::spawn 来生成一个 Spawn,代码如下:

pub struct Executor<'a> {
    cq: &'a CompletionQueue,
}

impl<'a> Executor<'a> {
    pub fn spawn<F>(&self, f: F)
    where
        F: Future<Item = (), Error = ()> + Send + 'static,
    {
        let s = executor::spawn(Box::new(f) as BoxFuture<_, _>);
        let notify = Arc::new(SpawnNotify::new(s, self.cq.clone()));
        poll(notify, false)
    }
}

SpawnNotify 对应的就是一个 Notify 对象,SpawnNotify 会创建一个 SpawnHandle,在对应的 notify 函数里面,我们会调用 SpawnHandle 的 notify 函数,这个函数里面就会创建一个 alarm 并通知 CQ。

    pub fn notify(&mut self, tag: Box<CallTag>) {
        self.alarm.take();
        let mut alarm = Alarm::new(&self.cq, tag);
        alarm.alarm();
        // We need to keep the alarm until tag is resolved.
        self.alarm = Some(alarm);
    }

当 CQ 的 next 返回了对应的 alarm 事件之后,我们会调用到 SpawnNotify 的 resolve 函数:

    pub fn resolve(self, success: bool) {
        // it should always be canceled for now.
        assert!(!success);
        poll(Arc::new(self.clone()), true);
    }

最后我们在关注下 poll 函数,无论是 Executor 的 spawn 还是SpawnNotify 的 resolve 里面,我们最后都会使用。poll 会调用 Spawn 的 poll_future_notify 函数:

fn poll(notify: Arc<SpawnNotify>, woken: bool) {
    let mut handle = notify.handle.lock();
    ......
    match handle.f.as_mut().unwrap().poll_future_notify(&notify, 0) {
        Err(_) | Ok(Async::Ready(_)) => {
            ......
            return;
        }
        _ => {}
    }
}

poll_future_notify 如果返回 NotReady,这里我们并不需要做特殊的处理,因为 CQ 会不停的调用 next,如果没有任何事件产生,next 自动回 block 住当前的 CQ 的进程,如果 future 变成了 ready,我们就可以告诉 CQ,CQ 自然会在 next 里面得到对应的事件,然后我们就能继续去执行这个 future 了。

小结

个人认为,task 应该算是 Rust future 里面最难理解的一个概念,但其实把它整个流程理顺了,就会觉得设计的非常巧妙,而且高效。大家也可以参考 future 自己的 CpuPool 以及 tokio-core 相关的代码,来加深对 task 的理解。

上一篇下一篇

猜你喜欢

热点阅读