Rust for cpp devs - channel

2021-05-22  本文已影响0人  找不到工作

与 golang 一样,Rust 也实现了 channel 用于线程间的通信。如同 golang 的口号一样:

Do not communicate by sharing memory; instead, share memory by communicating

channel 代表了 communicate 的思想。而传统的 mutex 代表了 share memory 的思想。

其根本区别在于,communicate 是 single ownership 的,当发送一个值到 channel 后,我们不再关心这个值,相当于将所有权移交给了接收线程。而 share memory 则是 multiple ownership 的,多个线程都会访问共享的变量,因此维护起来比 communicate 方式复杂得多。

Rust 中的 channel 本质上是多个生产者、一个消费者 (multiple producer single consumer) 的队列实现的,mpsc 提供了两类 channel,分别是:

channel 具有发送者和接受者,当任意一方被 drop,channel 就关闭了。

一个简单的例子如下:

// multiple producer single consumer
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let greating = String::from("hi");
        tx.send(greating).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("received {}", received);
}

我们也可以发送多条消息:

// multiple producer single consumer
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    for received in rx {
        println!("received {}", received);
    }
}

可以看到,每隔 1s 打印出一个单词。注意,接收时我们不再调用 recv() 方法,而是将 rx 作为一个迭代器。当 channel 关闭时,迭代终止。

多个生产者

channel 支持多个生产者模式,而且,tx 是一个可以被 clone 的对象。我们可以如下实现:

// multiple producer single consumer
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });


    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("received {}", received);
    }
}

注意两个任务使用的发送者不一样:

    let tx1 = tx.clone();
上一篇下一篇

猜你喜欢

热点阅读