< 返回版块

黑豆腐 发表于 2020-09-11 08:43

Tags:rust

这一期的内容会轻松一些,讲讲crossbeam中的channel。可是有人就要问了在标准库里面已经有了std::sync::mpsc,为什么crossbeam又要搞出一套channel呢?首先我们来看看标准库中的channel有哪些不足吧

标准库中channel的不足

  1. Receiver不能被clone,是MPSC的channel。理想状况我们希望能有MPMC的channel
  2. Sender和Receiver不是Sync
  3. 在Go语言中,channel一般和select语句一起使用,但是标准库中的channel并不支持select
  4. 有限容量(Bounded)的channel内部实现就是一个简单的Mutex<VecDeque<T>>,性能比Go语言的channel还差
  5. 有Sender(=Unbouded)和SyncSender(=Bounded)的区分,用起来不统一。

crossbeam中加强版的channel

首先,无论容量是否有限,Sender类型统一成一种,这样用起来就很方便。其次对有限容量的channel进行了重写(还记得上一期我们讲的Deque其实就是为了消除Mutex<VecDeque<T>>产生的瓶颈么,这里也类似。对于1-3点:(在此之前我们先简单讲下如何创建crossbeam的channel)

创建channel

有限容量

use crossbeam_channel::bounded;

// 创建一个容量是5的channel
let (s, r) = bounded(5);

// 5条消息之内都不会阻塞
for i in 0..5 {
    s.send(i).unwrap();
}

// 超过5条就会阻塞了
// s.send(5).unwrap();

无限容量

use crossbeam_channel::unbounded;

// 创建一个无限容量的channel
let (s, r) = unbounded();

// 不会阻塞
for i in 0..1000 {
    s.send(i).unwrap();
}

·

1 支持MPMC

现在终于不用笨拙地给Receiver端加锁了~

use std::thread;
use crossbeam_channel::bounded;

let (s1, r1) = bounded(0);
let (s2, r2) = (s1.clone(), r1.clone());

// 起一个线程先接受一个消息然后发出一个消息
thread::spawn(move || {
    r2.recv().unwrap();
    s2.send(2).unwrap();
});

// 发送一个消息然后接受一个消息
s1.send(1).unwrap();
r1.recv().unwrap();

2 Sender和Receiver是Sync

所以现在可以把引用在线程间传递了

use std::thread;
use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;

let (s, r) = bounded(0);

scope(|scope| {
    // 起一个线程先接受一个消息然后发出一个消息
    scope.spawn(|_| {
        r.recv().unwrap();
        s.send(2).unwrap();
    });

    // 发送一个消息然后接受一个消息
    s.send(1).unwrap();
    r.recv().unwrap();
}).unwrap();

3 支持select

提供了类似Go语言功能的select宏,支持使用default分支处理超时等逻辑

use std::thread;
use std::time::Duration;
use crossbeam_channel::unbounded;

let (s1, r1) = unbounded();
let (s2, r2) = unbounded();

thread::spawn(move || s1.send(10).unwrap());
thread::spawn(move || s2.send(20).unwrap());

select! {
    recv(r1) -> msg => assert_eq!(msg, Ok(10)),
    recv(r2) -> msg => assert_eq!(msg, Ok(20)),
    default(Duration::from_secs(1)) => println!("timed out"),
}

并且其实select内部不仅仅支持接受消息,也支持发送消息。同时还有更高级的动态select支持~

小结

我们看到,crossbeam的channel优雅的解决了标准库中上述的5个问题,看来没事可以多用用了~下一期我们会讲一下crossbeam-util和crossbeam-queue,敬请期待。

评论区

写评论
derust 2020-09-13 00:01

支持!~

stjepang大神基于crossbeam又弄了一个(async-channel)[https://github.com/stjepang/async-channel]

以及tokio的channel和 futures库的channel等,这些如果能做一个对比总结就更好了

chenwei767 2020-09-11 18:12

赞!

whfuyn 2020-09-11 10:00

赞!

1 共 3 条评论, 1 页