lithbitren 发表于 2023-01-21 03:15
Tags:tokio,channel,mutex,crossbeam,async-channel,kanal,flume,postage
上次提到了tokio::sync::Mutex
的性能不行,tokio
官方文档推荐的解决方案:一、如果不存在长时间异步任务的可以用标准库互斥锁std::sync::Mutex
,tokio
官方文档的还推荐了一个第三方互斥锁parking_lot
,但我测了无论是互斥锁还是读写锁性能都不如标准库的锁,所以就不推荐了;二、可以用tokio::sync::mpsc::unbounded_channel
等异步通道来代替锁,甚至连教程里mini-redis
都是基于通道的思路实现的,于是我就想测了测通道的性能。
测试思路就是最简单的发号器,以发号器作为生产者producer
,然后分出多个协程用作消费,一共发100万个号,消费子协程通过向生产者发出信号(子协程以及消费者对应的序号),然后通过调度器通道dispatcher
同步等待返回值,并将返回值添加到作为消费者的数组consumers
,大体就是这么个思路。
同时作为对比,测试了单线程下百万序号分配给N个数组的时间,原子计数器在多线程的情况下的发号时间,互斥锁在多线程下的分配时间,std::sync::mpsc::channel
标准库通道的百万次发号时间,以及最流行的第三方无锁通道库crossbeam_channel
的性能。
以及在tokio
的runtime下,异步互斥锁tokio::sync::Mutex
的发号时间,异步无限制通道tokio::sync::mpsc::unbounded_channel
的发号时间,以及第三方库async_channel
以及postage
的异步发号时间。其中async_channel
是crossbeam_channel
在github的issue里推荐异步方案,而且两者都是支持接收者Receiver
的克隆的,std
和tokio
的通道都不支持接收者克隆。
不管是同步通道还是异步通道,可用的第三方库都比较少,同步库还有功能性的延迟队列delay_queue
以及阻塞队列blocking_queue
不过基本都是基于标准库的同步锁实现的,最后性能和标准库的std::sync::mpsc::channel
基本一致,而其他异步通道库大多跑不起来,就没选进来。
经过坛友提醒加上了kanal
和flume
这两个通道框架都包函了同步通道和异步通道,为测试增加了4组样本。
下面主要展示下基于tokio
官方的通道发号器的实现思路。
fn tokio_unbounded_channel() {
print!("\ntokio-unbounded-channel: ");
// 循环N_LOOP次测试
for _ in 1..=N_LOOP {
// 初始化生产者通道,用的无限容量的tokio异步通道
let (producer_sender, mut producer_receiver) = tokio::sync::mpsc::unbounded_channel();
// 初始化调度器,为了让协程开启后再生产调度通道,所以先用None初始化
let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
// 消费者数组,用异步互斥锁先初始化,否则无法轻易分享到协程,等进入协程后再分别解锁,并不影响通道的性能测试
let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
// 异步的barrier/wait阻塞锁,为的是等各个线程的调度通道都初始化完成后,生产者再开始接受数据,不影响通道性能测试
let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
// 计时开始
let start = Instant::now();
// 用了一个第三方库作用域库,用法和标准库的thead::scope几乎一致,可以省去跨线程引用计数Arc,也可以不用join
tokio_scoped::scope(|s| {
for i in 0..N_THREADS {
// 循环里将各个公共变量通过引用的形式传进协程作用域
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(async move {
// 初始化调度器通道
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
// 在子协程里初始化可以避免接收者的跨作用域复制,而发送者一般都支持克隆,所以把发送者克隆出去
dispatcher.lock().await[i] = Some(tx);
// 等待所有发送者初始化完成
barrier.wait().await;
// 把消费者数组从锁中取出来,避免影响push的性能
let mut consumer = consumers[i].lock().await;
for _ in 0..N_TIMES {
// 向生产者发送该协程所在的序号
producer_sender.send(i).unwrap();
// 同步等待并接收通过调度器dispatcher通道返回来的序号
let j = rx.recv().await.unwrap();
// 将序号推入消费者数组
consumer.push(j);
}
});
}
// 将公共变量以引用的形式传入生产者协程的作用域
let dispatcher = &dispatcher;
let barrier = &barrier;
s.spawn(async move {
// 等待调度通道初始化完成
barrier.wait().await;
// 把调度通道从锁中取出来,以保证使用的时候不影响通道发送效率
let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
// 进行百万次发号循环
for j in 0..N_TIMES * N_THREADS {
// 接收从各个协程发来的序号
let i = producer_receiver.recv().await.unwrap();
// 向序号对应的调度器通道顺序发送号码
dispatcher[i].send(j).unwrap();
}
});
});
// 打印总耗时
print!("{:?}, ", start.elapsed());
}
}
因为官方文档说锁性能不行,但为了严谨,也测试了官方的说法,这里生产者就是用异步互斥锁包住的一个usize
数字,
fn tokio_mutex() {
print!("\ntokio-mutex: ");
for _ in 1..=N_LOOP {
let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
// 初始化锁以及序号
let producer = tokio::sync::Mutex::new(0usize);
let start = Instant::now();
tokio_scoped::scope(|s| {
for i in 0..N_THREADS {
let producer = &producer;
let consumers = &consumers;
s.spawn(async move {
let mut consumer = consumers[i].lock().await;
for _ in 0..N_TIMES {
// 直接解锁获取排号
let mut j = producer.lock().await;
// 将序号推入消费者数组
consumer.push(*j);
// 让生产者序号自增
*j += 1;
}
});
}
});
print!("{:?}, ", start.elapsed());
}
}
如果是个5个线程5个循环,最终可能会得到这样的发号结果,即每个数组都获得了5个号,所有数组获得的号码独立无重复:
43.4µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 9, 13, 17, 21] }, Mutex { data: [8, 12, 16, 20, 24] }, Mutex { data: [6, 10, 14, 18, 22] }, Mutex { data: [7, 11, 15, 19, 23] }]
40.5µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 13, 16, 19, 22] }, Mutex { data: [11, 14, 17, 20, 23] }, Mutex { data: [12, 15, 18, 21, 24] }]
33.6µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [16, 18, 20, 22, 24] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 17, 19, 21, 23] }]
32.9µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [10, 11, 12, 14, 17] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [13, 16, 19, 21, 23] }, Mutex { data: [15, 18, 20, 22, 24] }]
34.7µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [20, 21, 22, 23, 24] }, Mutex { data: [15, 16, 17, 18, 19] }]
27.2µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 16, 17, 18, 19] }, Mutex { data: [20, 21, 22, 23, 24] }]
33.2µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [20, 21, 22, 23, 24] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 16, 17, 18, 19] }]
29.1µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 16, 17, 18, 19] }, Mutex { data: [20, 21, 22, 23, 24] }]
31.3µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [20, 21, 22, 23, 24] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 16, 17, 18, 19] }]
35µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [10, 13, 16, 19, 22] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [11, 14, 17, 20, 23] }, Mutex { data: [12, 15, 18, 21, 24] }]
当然,5*5的发号器是无法测出性能,于是我们可以测出分10个线程/协程百万次发号性能,以及1000个线程/协程的发号性能。
测试环境是win10,2.6Ghz,12线程,不同环境下结果不一定相同,仅供参考,最终结果如下:
threads: 10, times: 100000
single-thread: 3.783ms, 2.3065ms, 2.362ms, 3.0346ms, 3.5482ms, 3.5821ms, 2.2885ms, 2.3595ms, 2.7192ms, 3.4395ms,
threads-atomic: 21.9723ms, 21.3465ms, 21.7387ms, 21.6502ms, 21.1958ms, 21.3998ms, 21.7855ms, 21.3095ms, 22.2298ms, 21.448ms,
threads-mutex: 26.2929ms, 25.6328ms, 26.5038ms, 24.0021ms, 27.0577ms, 25.5051ms, 26.3975ms, 25.0175ms, 25.0625ms, 25.2786ms,
threads-channel: 1.7890931s, 1.7893984s, 1.7701385s, 1.7255495s, 1.745036s, 1.7695388s, 1.7463475s, 1.7560569s, 1.7514867s, 1.7705453s,
threads-crossbeam-channel: 162.0278ms, 158.3723ms, 160.3896ms, 160.8348ms, 159.1323ms, 156.0105ms, 160.1932ms, 158.0119ms, 159.8726ms, 160.1443ms,
threads-kanal: 577.0325ms, 458.3801ms, 496.832ms, 517.777ms, 602.367ms, 522.8839ms, 565.2879ms, 544.6193ms, 607.2045ms, 588.5532ms,
threads-flume: 1.7274724s, 1.7240062s, 1.731051s, 1.7344545s, 1.7197228s, 1.7115723s, 1.7206029s, 1.7409012s, 1.7325423s, 1.7337557s,
tokio-mutex: 186.5466ms, 186.4727ms, 183.7647ms, 184.0309ms, 186.6594ms, 184.6223ms, 182.0394ms, 184.6092ms, 184.4775ms, 182.8283ms,
tokio-unbounded-channel: 340.7769ms, 304.7117ms, 330.3464ms, 329.0507ms, 320.9479ms, 331.2624ms, 341.5339ms, 328.4041ms, 315.6056ms, 331.1609ms,
tokio-async-channel: 328.5224ms, 346.4572ms, 349.2641ms, 344.0127ms, 324.8363ms, 325.5096ms, 326.4684ms, 325.3062ms, 324.0677ms, 317.2376ms,
tokio-kanal: 332.9933ms, 363.6793ms, 370.0358ms, 436.6208ms, 374.7506ms, 378.9373ms, 482.2834ms, 362.9407ms, 388.2605ms, 341.7146ms,
tokio-flume: 456.109ms, 463.8518ms, 495.6742ms, 493.0587ms, 462.5921ms, 460.3794ms, 460.6897ms, 461.7761ms, 464.3175ms, 467.5147ms,
tokio-postage: 410.2627ms, 401.0765ms, 412.4691ms, 397.3949ms, 402.8839ms, 401.4267ms, 401.0195ms, 398.3384ms, 416.1288ms, 408.2543ms,
threads: 1000, times: 1000
single-thread: 5.2886ms, 4.8897ms, 4.6462ms, 4.7382ms, 4.9838ms, 4.7364ms, 5.0485ms, 4.4898ms, 5.0647ms, 4.6328ms,
threads-atomic: 48.0055ms, 44.2245ms, 42.62ms, 42.9086ms, 44.3698ms, 43.9917ms, 42.6267ms, 42.6518ms, 42.3383ms, 42.3764ms,
threads-mutex: 42.8413ms, 42.0336ms, 41.6811ms, 41.5364ms, 41.5922ms, 41.4252ms, 41.1163ms, 41.7143ms, 41.62ms, 41.9739ms,
threads-channel: 1.8085408s, 1.8154578s, 1.8032787s, 1.7955681s, 1.8008499s, 1.8432902s, 1.8030498s, 1.8072515s, 1.7974962s, 1.7995223s,
threads-crossbeam-channel: 226.0173ms, 222.2247ms, 220.5695ms, 221.6121ms, 222.8465ms, 228.0142ms, 214.4613ms, 212.3422ms, 216.4174ms, 217.5602ms,
threads-kanal: 481.7414ms, 474.3993ms, 456.9561ms, 475.5404ms, 447.9582ms, 455.4468ms, 501.7496ms, 470.8391ms, 463.0727ms, 456.7112ms,
threads-flume: 1.8066982s, 1.7691016s, 1.7934957s, 1.7645953s, 1.7546542s, 1.7968676s, 1.8112811s, 1.8589003s, 1.8273865s, 1.83964s,
tokio-mutex: 194.3765ms, 192.9023ms, 186.9128ms, 186.9857ms, 187.0546ms, 187.7246ms, 189.9724ms, 189.3485ms, 187.0087ms, 199.4855ms,
tokio-unbounded-channel: 315.4228ms, 308.7256ms, 291.061ms, 302.7888ms, 295.4177ms, 296.0638ms, 299.564ms, 300.1645ms, 300.3739ms, 291.7153ms,
tokio-async-channel: 271.9142ms, 249.7759ms, 253.7366ms, 244.3125ms, 256.9634ms, 248.1089ms, 257.8614ms, 263.5845ms, 256.5529ms, 256.8154ms,
tokio-kanal: 366.6078ms, 357.2386ms, 391.1622ms, 385.9052ms, 321.6122ms, 349.8728ms, 352.8915ms, 436.9492ms, 452.8487ms, 545.8487ms,
tokio-flume: 400.0873ms, 415.8159ms, 397.7332ms, 405.4837ms, 419.9753ms, 415.9008ms, 390.4128ms, 421.0755ms, 391.907ms, 405.7023ms,
tokio-postage: 359.1844ms, 366.0432ms, 354.1134ms, 348.1011ms, 348.2243ms, 367.2243ms, 353.5227ms, 356.832ms, 362.9809ms, 368.1148ms,
可以看出,首先单线程计数推数组百万次操作大约是2-6毫秒,多线程原子计数和多线程互斥锁大约是20-40ms,且随着线程数增多而负担加重。
基于线程的阻塞通道性能不加,平均时间都在1600+毫秒这样,所以在多线程里其实是不建议用标准库的通道进行大规模调度的。
而crossbeam-channel
的无锁通道则表现优异,百万次操作多线程都在220ms以内,功能上也比标准库的通道更好用。
tokio
运行时这边,10个协程和1000个协程对性能的影响都不算太大。
标准库里用异步锁Mutex
大约在180ms上下,用异步通道unbounded_channel
大约在300-330ms上下,官方文档虽然说了异步互斥锁有性能问题,其性能比通道来说还是马马虎虎的,起码比官方推荐的通道会快上接近一倍,在一些既需要异步等待,又需要加锁的类似于事务的操作中用异步锁也完全没问题。
async_channel
性能略好于标准库的tokio::sync::mpsc::unbounded_channel
,特别是在多协程的状态下,功能上也更好用,值得日常使用。
postage
虽然功能上与async_channel
相似,但性能却不太佳,可能是其没有不限容量通道,只能通过大缓存通道来初始化导致的。
后加的kanal
和flume
表现一般,同步通道上flume
性能和标准库一致,似乎不是无锁通道,异步上postage
、kanal
和flume
三者半斤八两,整体性能都不如async-channel
。
源码直接贴帖子里,因为一个环境总共就测三四组数据,所以就没有抽象出功能来(抽象太费脑筋),大体上代码结构都是相似的。
[dependencies]
tokio = {version = "1.24.1", features = ["full"]}
futures = "0.3.25"
tokio-scoped = "0.2.0"
crossbeam-channel = "0.5.6"
postage = "0.5.0"
async-channel = "1.8.0"
kanal = "0.1.0-pre8"
flume = "0.10.14"
use std::thread;
use std::sync::{Barrier, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use tokio;
use tokio_scoped;
use crossbeam_channel;
use blockingqueue;
use delay_queue::{Delay, DelayQueue};
use postage::{self, prelude::Stream, sink::Sink};
use async_channel;
const N_THREADS: usize = 10;
const N_TIMES: usize = 100000;
const N_LOOP: usize = 10;
#[tokio::main(worker_threads = 10)]
async fn main() {
print!("threads: {}, times: {}\n\n", N_THREADS, N_TIMES);
single_thread();
threads_atomic();
threads_mutex();
threads_channel();
threads_crossbeam_channel();
threads_kanal();
threads_flume();
tokio_mutex();
tokio_unbounded_channel();
tokio_async_channel();
tokio_kanal();
tokio_flume();
tokio_postage();
}
fn single_thread() {
print!("single-thread: ");
for _ in 1..=N_LOOP {
let start = Instant::now();
let mut consumers = (0..N_THREADS).map(|_| Vec::new()).collect::<Vec<_>>();
for i in 0..N_THREADS {
for j in i * N_TIMES .. (i + 1) * N_TIMES {
consumers[i].push(j);
}
}
print!("{:?}, ", start.elapsed());
}
}
fn threads_atomic() {
print!("\nthreads-atomic: ");
for _ in 1..=N_LOOP {
let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
let producer = AtomicUsize::new(0);
let start = Instant::now();
thread::scope(|s| {
for i in 0..N_THREADS {
let producer = &producer;
let consumers = &consumers;
s.spawn(move || {
let mut consumer = consumers[i].lock().unwrap();
for _ in 0..N_TIMES {
let j = producer.fetch_add(1, Ordering::Relaxed);
consumer.push(j);
}
});
}
});
print!("{:?}, ", start.elapsed());
}
}
fn threads_mutex() {
print!("\nthreads-mutex: ");
for _ in 1..=N_LOOP {
let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
let producer = Mutex::new(0usize);
let start = Instant::now();
thread::scope(|s| {
for i in 0..N_THREADS {
let producer = &producer;
let consumers = &consumers;
s.spawn(move || {
let mut consumer = consumers[i].lock().unwrap();
for _ in 0..N_TIMES {
let mut j = producer.lock().unwrap();
consumer.push(*j);
*j += 1;
}
});
}
});
print!("{:?}, ", start.elapsed());
}
}
fn threads_channel() {
print!("\nthreads-channel: ");
for _ in 1..=N_LOOP {
let (producer_sender, producer_receiver) = std::sync::mpsc::channel();
let dispatcher = Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = Barrier::new(N_THREADS + 1);
let start = Instant::now();
thread::scope(|s| {
for i in 0..N_THREADS {
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(move || {
let (tx, rx) = std::sync::mpsc::channel();
dispatcher.lock().unwrap()[i] = Some(tx);
barrier.wait();
let mut consumer = consumers[i].lock().unwrap();
for _ in 0..N_TIMES {
producer_sender.send(i).unwrap();
let j = rx.recv().unwrap();
consumer.push(j);
}
});
}
barrier.wait();
let dispatcher = dispatcher.lock().unwrap().iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv().unwrap();
dispatcher[i].send(j).unwrap();
}
});
print!("{:?}, ", start.elapsed());
}
}
fn threads_crossbeam_channel() {
print!("\nthreads-crossbeam-channel: ");
for _ in 1..=N_LOOP {
let (producer_sender, producer_receiver) = crossbeam_channel::unbounded();
let dispatcher = Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = Barrier::new(N_THREADS + 1);
let start = Instant::now();
thread::scope(|s| {
for i in 0..N_THREADS {
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(move || {
let (tx, rx) = crossbeam_channel::unbounded();
dispatcher.lock().unwrap()[i] = Some(tx);
barrier.wait();
let mut consumer = consumers[i].lock().unwrap();
for _ in 0..N_TIMES {
producer_sender.send(i).unwrap();
let j = rx.recv().unwrap();
consumer.push(j);
}
});
}
barrier.wait();
let dispatcher = dispatcher.lock().unwrap().iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv().unwrap();
dispatcher[i].send(j).unwrap();
}
});
print!("{:?}, ", start.elapsed());
}
}
fn threads_kanal() {
print!("\nthreads-kanal: ");
for _ in 1..=N_LOOP {
let (producer_sender, producer_receiver) = kanal::unbounded();
let dispatcher = Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = Barrier::new(N_THREADS + 1);
let start = Instant::now();
thread::scope(|s| {
for i in 0..N_THREADS {
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(move || {
let (tx, rx) = kanal::unbounded();
dispatcher.lock().unwrap()[i] = Some(tx);
barrier.wait();
let mut consumer = consumers[i].lock().unwrap();
for _ in 0..N_TIMES {
producer_sender.send(i).unwrap();
let j = rx.recv().unwrap();
consumer.push(j);
}
});
}
barrier.wait();
let dispatcher = dispatcher.lock().unwrap().iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv().unwrap();
dispatcher[i].send(j).unwrap();
}
});
print!("{:?}, ", start.elapsed());
}
}
fn threads_flume() {
print!("\nthreads-flume: ");
for _ in 1..=N_LOOP {
let (producer_sender, producer_receiver) = flume::unbounded();
let dispatcher = Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = Barrier::new(N_THREADS + 1);
let start = Instant::now();
thread::scope(|s| {
for i in 0..N_THREADS {
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(move || {
let (tx, rx) = flume::unbounded();
dispatcher.lock().unwrap()[i] = Some(tx);
barrier.wait();
let mut consumer = consumers[i].lock().unwrap();
for _ in 0..N_TIMES {
producer_sender.send(i).unwrap();
let j = rx.recv().unwrap();
consumer.push(j);
}
});
}
barrier.wait();
let dispatcher = dispatcher.lock().unwrap().iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv().unwrap();
dispatcher[i].send(j).unwrap();
}
});
print!("{:?}, ", start.elapsed());
}
}
fn tokio_mutex() {
print!("\ntokio-mutex: ");
for _ in 1..=N_LOOP {
let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
let producer = tokio::sync::Mutex::new(0usize);
let start = Instant::now();
tokio_scoped::scope(|s| {
for i in 0..N_THREADS {
let producer = &producer;
let consumers = &consumers;
s.spawn(async move {
let mut consumer = consumers[i].lock().await;
for _ in 0..N_TIMES {
let mut j = producer.lock().await;
consumer.push(*j);
*j += 1;
}
});
}
});
print!("{:?}, ", start.elapsed());
// println!("consumers: {:?}", consumers);
}
}
fn tokio_unbounded_channel() {
print!("\ntokio-unbounded-channel: ");
for _ in 1..=N_LOOP {
let (producer_sender, mut producer_receiver) = tokio::sync::mpsc::unbounded_channel();
let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
let start = Instant::now();
tokio_scoped::scope(|s| {
for i in 0..N_THREADS {
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(async move {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
dispatcher.lock().await[i] = Some(tx);
barrier.wait().await;
let mut consumer = consumers[i].lock().await;
for _ in 0..N_TIMES {
producer_sender.send(i).unwrap();
let j = rx.recv().await.unwrap();
consumer.push(j);
}
});
}
let dispatcher = &dispatcher;
let barrier = &barrier;
s.spawn(async move {
barrier.wait().await;
let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv().await.unwrap();
dispatcher[i].send(j).unwrap();
}
});
});
print!("{:?}, ", start.elapsed());
// println!("consumers: {:?}", consumers);
}
}
fn tokio_async_channel() {
print!("\ntokio-async-channel: ");
for _ in 1..=N_LOOP {
let (producer_sender, producer_receiver) = async_channel::unbounded();
let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
let start = Instant::now();
tokio_scoped::scope(|s| {
for i in 0..N_THREADS {
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(async move {
let (tx, rx) = async_channel::unbounded();
dispatcher.lock().await[i] = Some(tx);
barrier.wait().await;
let mut consumer = consumers[i].lock().await;
for _ in 0..N_TIMES {
producer_sender.send(i).await.ok();
let j = rx.recv().await.unwrap();
consumer.push(j);
}
});
}
let dispatcher = &dispatcher;
let barrier = &barrier;
s.spawn(async move {
barrier.wait().await;
let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv().await.unwrap();
dispatcher[i].send(j).await.ok();
}
});
});
print!("{:?}, ", start.elapsed());
}
}
fn tokio_kanal() {
print!("\ntokio-kanal: ");
for _ in 1..=N_LOOP {
let (producer_sender, producer_receiver) = kanal::unbounded_async();
let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
let start = Instant::now();
tokio_scoped::scope(|s| {
for i in 0..N_THREADS {
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(async move {
let (tx, rx) = kanal::unbounded_async();
dispatcher.lock().await[i] = Some(tx);
barrier.wait().await;
let mut consumer = consumers[i].lock().await;
for _ in 0..N_TIMES {
producer_sender.send(i).await.ok();
let j = rx.recv().await.unwrap();
consumer.push(j);
}
});
}
let dispatcher = &dispatcher;
let barrier = &barrier;
s.spawn(async move {
barrier.wait().await;
let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv().await.unwrap();
dispatcher[i].send(j).await.ok();
}
});
});
print!("{:?}, ", start.elapsed());
}
}
fn tokio_flume() {
print!("\ntokio-flume: ");
for _ in 1..=N_LOOP {
let (producer_sender, producer_receiver) = flume::unbounded();
let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
let start = Instant::now();
tokio_scoped::scope(|s| {
for i in 0..N_THREADS {
let producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(async move {
let (tx, rx) = flume::unbounded();
dispatcher.lock().await[i] = Some(tx);
barrier.wait().await;
let mut consumer = consumers[i].lock().await;
for _ in 0..N_TIMES {
producer_sender.send_async(i).await.ok();
let j = rx.recv_async().await.unwrap();
consumer.push(j);
}
});
}
let dispatcher = &dispatcher;
let barrier = &barrier;
s.spawn(async move {
barrier.wait().await;
let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv_async().await.unwrap();
dispatcher[i].send_async(j).await.ok();
}
});
});
print!("{:?}, ", start.elapsed());
}
}
fn tokio_postage() {
print!("\ntokio-postage: ");
for _ in 1..=N_LOOP {
let (producer_sender, mut producer_receiver) = postage::mpsc::channel::<usize>(N_TIMES);
let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
let start = Instant::now();
tokio_scoped::scope(|s| {
for i in 0..N_THREADS {
let mut producer_sender = producer_sender.clone();
let dispatcher = &dispatcher;
let consumers = &consumers;
let barrier = &barrier;
s.spawn(async move {
let (tx, mut rx) = postage::mpsc::channel::<usize>(N_TIMES);
dispatcher.lock().await[i] = Some(tx);
barrier.wait().await;
let mut consumer = consumers[i].lock().await;
for _ in 0..N_TIMES {
producer_sender.send(i).await.ok();
let j = rx.recv().await.unwrap();
consumer.push(j);
}
});
}
let dispatcher = &dispatcher;
let barrier = &barrier;
s.spawn(async move {
barrier.wait().await;
let mut dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
for j in 0..N_TIMES * N_THREADS {
let i = producer_receiver.recv().await.unwrap();
dispatcher[i].send(j).await.ok();
}
});
});
print!("{:?}, ", start.elapsed());
}
}
评论区
写评论咦?我刚刚把rust版本升级到
rustc 1.69.0-nightly (d7948c843 2023-01-26)
了,测出来结果没变啊,crossbeam进标准库难道是只有stable可以用吗?情理之中,一直觉得
crossbeam
对rust的渗透很严重,像scope
这样的语句貌似也是crossbeam
先推广开的,然后去年才进标准库的。这次的测试版本用的是
rustc 1.68.0-nightly (270c94e48 2022-12-28)
--
👇
eweca-d: 1.67,crossbeam进入std库了
测出来了,
kanal
和flume
同步异步通道加了4组,flume
同步通道性能很差,两者异步通道性能也一般,不如crossbeam-channal
推荐的async-channal
。--
👇
BobAnkh: 做测量挺不错的,之前看到过有人测过,也供你参考:https://github.com/fereidani/rust-channel-benchmarks
不妨借鉴一下,也可以在你这里测测kanal和flume的性能,我也挺好奇的
1.67,crossbeam进入std库了
做测量挺不错的,之前看到过有人测过,也供你参考:https://github.com/fereidani/rust-channel-benchmarks
不妨借鉴一下,也可以在你这里测测kanal和flume的性能,我也挺好奇的
嘿嘿确实挺糙的,不过春节都懒得开电脑了,改天有时间把flume加进去吧。
crossbeam的benchmark没找到,flume的benchmark看上去项目挺全的,不过没找到高并发的代码,大多是少量线程不同功能下的测试。
我这的测试灵感来自于tokio的mini-redis,发号器内核简单改成hashmap就是个并发容器了,flume那种全项跨语言的测法可能没啥时间搞了。
--
👇
包布丁: 测 channel 怎么能少得了 flume 呢。还有这个 benchmark 方法有点太糙了,建议看看 flume 和 crossbeam 是怎么测性能的。
测 channel 怎么能少得了 flume 呢。还有这个 benchmark 方法有点太糙了,建议看看 flume 和 crossbeam 是怎么测性能的。
还有,关于这篇文章为什么要加上多线程通道的对比。
在各个语言的协程时代以及redis单线程多路io复用时代来临之前,我们是怎么建立分布式数据容器和实现同步的。
就我所知,大多一开始还是用自己语言的同步锁,后来升级成读写锁,再后来为了操作的有序性用上通道,没有通道的语言就用条件通知锁加双端队列模拟通道。
但主流还是锁,就像这篇文章测出来的一样,连rust的线程锁都比线程通道快上近两个数量级,更别说其他gc语言或动态语言了。
但为什么进去协程时代就推荐通道了呢,原来是异步锁性能下降了,异步通道性能提升了,两者进入了同一个数量级,但仍不像某些文章说异步运行时下通道比锁快。
但并不能为此否定异步通道,异步通道比锁的适用性更好,几乎可以把所有单线程容器直接移植进来,无需考虑其他并发因素。
比如lockfree的并发hashmap很多,但数据淘汰里的并发lru/lfu就很少,屏蔽词检测替换里的并发ac自动机就很少。当然,上述情景也可以通过加锁完成,比如ac自动机我就是用arc_swap的load加的读锁,比标准库锁更快,更适用于只读不写的场景,屏蔽词更新的时候再store整体修改,并不要求像hashmap那样局部按key修改。
--
👇
danclive: 你的好几篇文章我都看过了,忍不住说几句,不同的锁,使用场景本身就不同,有些适合轻载,有些适合重载。比如说“Mutex”适合大部分情况;“RwLock”适合读远大于写的情况;自旋锁用在竞争非常少的情况,乐观时能防止线程被切换。不同的“Mutex”实现也有其自身的考虑。std::thread::park,std::hint::spin_loop,std::thread::yield_now 在文档中也有说明,你就单纯的用for循环比较他们的性能?
不管是标准库还是第三方库,没有多少人具体测出过各个锁的性能,所以就测了一下。
本来就是官方互斥锁用的最多,park/spin_lock/yield_now大多数普通人几乎用不到,标准库往往只说了场景,但很少提及性能。
比如说park/spn_lock虽然很直观,但我实在也找不到什么场景能够超越barrie锁或条件锁。
比如你说rwlock比mutex适合读远大于写的情况,这个远大于到底是10:1还是100:1或更多,我这测出读写大约10:1左右就可以上读写锁了,小于十比一时读写锁远不如互斥锁,问题是不测怎么知道,难道读文档就能知道所有了吗?
再比如这篇文章开头写的那样,tokio官方文档关于锁的说明,实测结果和官方说的并不完全相符,可能跟各种版本环境有关,但不测怎么知道,照本宣科有啥意思?
至于是不是用for循环来测,我认为循环只是放大加锁负担的工具,原则上基本上测的大多数百万并发下各种锁的占用的时间,可以让人对系统吞吐量和压力有一个基本的预估,如果你有更好的测试方法当然欢迎交流讨论。
--
👇
danclive: 你的好几篇文章我都看过了,忍不住说几句,不同的锁,使用场景本身就不同,有些适合轻载,有些适合重载。比如说“Mutex”适合大部分情况;“RwLock”适合读远大于写的情况;自旋锁用在竞争非常少的情况,乐观时能防止线程被切换。不同的“Mutex”实现也有其自身的考虑。std::thread::park,std::hint::spin_loop,std::thread::yield_now 在文档中也有说明,你就单纯的用for循环比较他们的性能?
你的好几篇文章我都看过了,忍不住说几句,不同的锁,使用场景本身就不同,有些适合轻载,有些适合重载。比如说“Mutex”适合大部分情况;“RwLock”适合读远大于写的情况;自旋锁用在竞争非常少的情况,乐观时能防止线程被切换。不同的“Mutex”实现也有其自身的考虑。std::thread::park,std::hint::spin_loop,std::thread::yield_now 在文档中也有说明,你就单纯的用for循环比较他们的性能?
好勤劳,凌晨3:15分写完的。Mark一下!