< 返回版块

lithbitren 发表于 2023-01-21 03:15

Tags:tokio,channel,mutex,crossbeam,async-channel,kanal,flume,postage

上次提到了tokio::sync::Mutex的性能不行,tokio官方文档推荐的解决方案:一、如果不存在长时间异步任务的可以用标准库互斥锁std::sync::Mutextokio官方文档的还推荐了一个第三方互斥锁parking_lot,但我测了无论是互斥锁还是读写锁性能都不如标准库的锁,所以就不推荐了;二、可以用tokio::sync::mpsc::unbounded_channel等异步通道来代替锁,甚至连教程里mini-redis都是基于通道的思路实现的,于是我就想测了测通道的性能。

测试思路就是最简单的发号器,以发号器作为生产者producer,然后分出多个协程用作消费,一共发100万个号,消费子协程通过向生产者发出信号(子协程以及消费者对应的序号),然后通过调度器通道dispatcher同步等待返回值,并将返回值添加到作为消费者的数组consumers,大体就是这么个思路。

同时作为对比,测试了单线程下百万序号分配给N个数组的时间,原子计数器在多线程的情况下的发号时间,互斥锁在多线程下的分配时间,std::sync::mpsc::channel标准库通道的百万次发号时间,以及最流行的第三方无锁通道库crossbeam_channel的性能。

以及在tokio的runtime下,异步互斥锁tokio::sync::Mutex的发号时间,异步无限制通道tokio::sync::mpsc::unbounded_channel的发号时间,以及第三方库async_channel以及postage的异步发号时间。其中async_channelcrossbeam_channel在github的issue里推荐异步方案,而且两者都是支持接收者Receiver的克隆的,stdtokio的通道都不支持接收者克隆。

不管是同步通道还是异步通道,可用的第三方库都比较少,同步库还有功能性的延迟队列delay_queue以及阻塞队列blocking_queue不过基本都是基于标准库的同步锁实现的,最后性能和标准库的std::sync::mpsc::channel基本一致,而其他异步通道库大多跑不起来,就没选进来。

经过坛友提醒加上了kanalflume这两个通道框架都包函了同步通道和异步通道,为测试增加了4组样本。

下面主要展示下基于tokio官方的通道发号器的实现思路。

fn tokio_unbounded_channel() {
    print!("\ntokio-unbounded-channel: ");
    
    // 循环N_LOOP次测试
    for _ in 1..=N_LOOP {
        // 初始化生产者通道,用的无限容量的tokio异步通道
        let (producer_sender, mut producer_receiver) = tokio::sync::mpsc::unbounded_channel();
        // 初始化调度器,为了让协程开启后再生产调度通道,所以先用None初始化
        let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
        // 消费者数组,用异步互斥锁先初始化,否则无法轻易分享到协程,等进入协程后再分别解锁,并不影响通道的性能测试
        let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
        // 异步的barrier/wait阻塞锁,为的是等各个线程的调度通道都初始化完成后,生产者再开始接受数据,不影响通道性能测试
        let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
        // 计时开始
        let start = Instant::now();    
        // 用了一个第三方库作用域库,用法和标准库的thead::scope几乎一致,可以省去跨线程引用计数Arc,也可以不用join
        tokio_scoped::scope(|s| {
            for i in 0..N_THREADS {
                // 循环里将各个公共变量通过引用的形式传进协程作用域
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(async move {
                    // 初始化调度器通道
                    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
                    // 在子协程里初始化可以避免接收者的跨作用域复制,而发送者一般都支持克隆,所以把发送者克隆出去
                    dispatcher.lock().await[i] = Some(tx);
                    // 等待所有发送者初始化完成
                    barrier.wait().await;                    
                    // 把消费者数组从锁中取出来,避免影响push的性能
                    let mut consumer = consumers[i].lock().await;
                    for _ in 0..N_TIMES {
                        // 向生产者发送该协程所在的序号
                        producer_sender.send(i).unwrap();
                        // 同步等待并接收通过调度器dispatcher通道返回来的序号
                        let j = rx.recv().await.unwrap();
                        // 将序号推入消费者数组
                        consumer.push(j);
                    }
                });
            }
            // 将公共变量以引用的形式传入生产者协程的作用域
            let dispatcher = &dispatcher;
            let barrier = &barrier;
            s.spawn(async move {
                // 等待调度通道初始化完成
                barrier.wait().await;    
                // 把调度通道从锁中取出来,以保证使用的时候不影响通道发送效率 
                let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
                // 进行百万次发号循环
                for j in 0..N_TIMES * N_THREADS {
                    // 接收从各个协程发来的序号
                    let i = producer_receiver.recv().await.unwrap();
                    // 向序号对应的调度器通道顺序发送号码
                    dispatcher[i].send(j).unwrap();
                }
            });
        });
        // 打印总耗时
        print!("{:?}, ", start.elapsed());
    }
}

因为官方文档说锁性能不行,但为了严谨,也测试了官方的说法,这里生产者就是用异步互斥锁包住的一个usize数字,

fn tokio_mutex() {
    print!("\ntokio-mutex: ");
    
    for _ in 1..=N_LOOP {
        let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
        // 初始化锁以及序号
        let producer = tokio::sync::Mutex::new(0usize);
        let start = Instant::now();
        tokio_scoped::scope(|s| {
            for i in 0..N_THREADS {
                let producer = &producer;
                let consumers = &consumers;
                s.spawn(async move {
                    let mut consumer = consumers[i].lock().await;
                    for _ in 0..N_TIMES {
                        // 直接解锁获取排号
                        let mut j = producer.lock().await;
                        // 将序号推入消费者数组
                        consumer.push(*j);
                        // 让生产者序号自增
                        *j += 1;
                    }
                });
            }
        });
        print!("{:?}, ", start.elapsed());
    }
}

如果是个5个线程5个循环,最终可能会得到这样的发号结果,即每个数组都获得了5个号,所有数组获得的号码独立无重复:

43.4µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 9, 13, 17, 21] }, Mutex { data: [8, 12, 16, 20, 24] }, Mutex { data: [6, 10, 14, 18, 22] }, Mutex { data: [7, 11, 15, 19, 23] }]
40.5µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 13, 16, 19, 22] }, Mutex { data: [11, 14, 17, 20, 23] }, Mutex { data: [12, 15, 18, 21, 24] }]     
33.6µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [16, 18, 20, 22, 24] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 17, 19, 21, 23] }]     
32.9µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [10, 11, 12, 14, 17] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [13, 16, 19, 21, 23] }, Mutex { data: [15, 18, 20, 22, 24] }]     
34.7µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [20, 21, 22, 23, 24] }, Mutex { data: [15, 16, 17, 18, 19] }]     
27.2µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 16, 17, 18, 19] }, Mutex { data: [20, 21, 22, 23, 24] }]     
33.2µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [20, 21, 22, 23, 24] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 16, 17, 18, 19] }]     
29.1µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 16, 17, 18, 19] }, Mutex { data: [20, 21, 22, 23, 24] }]     
31.3µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [20, 21, 22, 23, 24] }, Mutex { data: [10, 11, 12, 13, 14] }, Mutex { data: [15, 16, 17, 18, 19] }]     
35µs, consumers: [Mutex { data: [0, 1, 2, 3, 4] }, Mutex { data: [10, 13, 16, 19, 22] }, Mutex { data: [5, 6, 7, 8, 9] }, Mutex { data: [11, 14, 17, 20, 23] }, Mutex { data: [12, 15, 18, 21, 24] }]

当然,5*5的发号器是无法测出性能,于是我们可以测出分10个线程/协程百万次发号性能,以及1000个线程/协程的发号性能。

测试环境是win10,2.6Ghz,12线程,不同环境下结果不一定相同,仅供参考,最终结果如下:

threads: 10, times: 100000

single-thread: 3.783ms, 2.3065ms, 2.362ms, 3.0346ms, 3.5482ms, 3.5821ms, 2.2885ms, 2.3595ms, 2.7192ms, 3.4395ms, 
threads-atomic: 21.9723ms, 21.3465ms, 21.7387ms, 21.6502ms, 21.1958ms, 21.3998ms, 21.7855ms, 21.3095ms, 22.2298ms, 21.448ms,
threads-mutex: 26.2929ms, 25.6328ms, 26.5038ms, 24.0021ms, 27.0577ms, 25.5051ms, 26.3975ms, 25.0175ms, 25.0625ms, 25.2786ms,
threads-channel: 1.7890931s, 1.7893984s, 1.7701385s, 1.7255495s, 1.745036s, 1.7695388s, 1.7463475s, 1.7560569s, 1.7514867s, 1.7705453s,
threads-crossbeam-channel: 162.0278ms, 158.3723ms, 160.3896ms, 160.8348ms, 159.1323ms, 156.0105ms, 160.1932ms, 158.0119ms, 159.8726ms, 160.1443ms,
threads-kanal: 577.0325ms, 458.3801ms, 496.832ms, 517.777ms, 602.367ms, 522.8839ms, 565.2879ms, 544.6193ms, 607.2045ms, 588.5532ms, 
threads-flume: 1.7274724s, 1.7240062s, 1.731051s, 1.7344545s, 1.7197228s, 1.7115723s, 1.7206029s, 1.7409012s, 1.7325423s, 1.7337557s, 
tokio-mutex: 186.5466ms, 186.4727ms, 183.7647ms, 184.0309ms, 186.6594ms, 184.6223ms, 182.0394ms, 184.6092ms, 184.4775ms, 182.8283ms, 
tokio-unbounded-channel: 340.7769ms, 304.7117ms, 330.3464ms, 329.0507ms, 320.9479ms, 331.2624ms, 341.5339ms, 328.4041ms, 315.6056ms, 331.1609ms, 
tokio-async-channel: 328.5224ms, 346.4572ms, 349.2641ms, 344.0127ms, 324.8363ms, 325.5096ms, 326.4684ms, 325.3062ms, 324.0677ms, 317.2376ms, 
tokio-kanal: 332.9933ms, 363.6793ms, 370.0358ms, 436.6208ms, 374.7506ms, 378.9373ms, 482.2834ms, 362.9407ms, 388.2605ms, 341.7146ms, 
tokio-flume: 456.109ms, 463.8518ms, 495.6742ms, 493.0587ms, 462.5921ms, 460.3794ms, 460.6897ms, 461.7761ms, 464.3175ms, 467.5147ms, 
tokio-postage: 410.2627ms, 401.0765ms, 412.4691ms, 397.3949ms, 402.8839ms, 401.4267ms, 401.0195ms, 398.3384ms, 416.1288ms, 408.2543ms, 
threads: 1000, times: 1000

single-thread: 5.2886ms, 4.8897ms, 4.6462ms, 4.7382ms, 4.9838ms, 4.7364ms, 5.0485ms, 4.4898ms, 5.0647ms, 4.6328ms, 
threads-atomic: 48.0055ms, 44.2245ms, 42.62ms, 42.9086ms, 44.3698ms, 43.9917ms, 42.6267ms, 42.6518ms, 42.3383ms, 42.3764ms, 
threads-mutex: 42.8413ms, 42.0336ms, 41.6811ms, 41.5364ms, 41.5922ms, 41.4252ms, 41.1163ms, 41.7143ms, 41.62ms, 41.9739ms, 
threads-channel: 1.8085408s, 1.8154578s, 1.8032787s, 1.7955681s, 1.8008499s, 1.8432902s, 1.8030498s, 1.8072515s, 1.7974962s, 1.7995223s, 
threads-crossbeam-channel: 226.0173ms, 222.2247ms, 220.5695ms, 221.6121ms, 222.8465ms, 228.0142ms, 214.4613ms, 212.3422ms, 216.4174ms, 217.5602ms, 
threads-kanal: 481.7414ms, 474.3993ms, 456.9561ms, 475.5404ms, 447.9582ms, 455.4468ms, 501.7496ms, 470.8391ms, 463.0727ms, 456.7112ms, 
threads-flume: 1.8066982s, 1.7691016s, 1.7934957s, 1.7645953s, 1.7546542s, 1.7968676s, 1.8112811s, 1.8589003s, 1.8273865s, 1.83964s, 
tokio-mutex: 194.3765ms, 192.9023ms, 186.9128ms, 186.9857ms, 187.0546ms, 187.7246ms, 189.9724ms, 189.3485ms, 187.0087ms, 199.4855ms, 
tokio-unbounded-channel: 315.4228ms, 308.7256ms, 291.061ms, 302.7888ms, 295.4177ms, 296.0638ms, 299.564ms, 300.1645ms, 300.3739ms, 291.7153ms, 
tokio-async-channel: 271.9142ms, 249.7759ms, 253.7366ms, 244.3125ms, 256.9634ms, 248.1089ms, 257.8614ms, 263.5845ms, 256.5529ms, 256.8154ms, 
tokio-kanal: 366.6078ms, 357.2386ms, 391.1622ms, 385.9052ms, 321.6122ms, 349.8728ms, 352.8915ms, 436.9492ms, 452.8487ms, 545.8487ms, 
tokio-flume: 400.0873ms, 415.8159ms, 397.7332ms, 405.4837ms, 419.9753ms, 415.9008ms, 390.4128ms, 421.0755ms, 391.907ms, 405.7023ms, 
tokio-postage: 359.1844ms, 366.0432ms, 354.1134ms, 348.1011ms, 348.2243ms, 367.2243ms, 353.5227ms, 356.832ms, 362.9809ms, 368.1148ms, 

可以看出,首先单线程计数推数组百万次操作大约是2-6毫秒,多线程原子计数和多线程互斥锁大约是20-40ms,且随着线程数增多而负担加重。

基于线程的阻塞通道性能不加,平均时间都在1600+毫秒这样,所以在多线程里其实是不建议用标准库的通道进行大规模调度的。

crossbeam-channel的无锁通道则表现优异,百万次操作多线程都在220ms以内,功能上也比标准库的通道更好用。

tokio运行时这边,10个协程和1000个协程对性能的影响都不算太大。

标准库里用异步锁Mutex大约在180ms上下,用异步通道unbounded_channel大约在300-330ms上下,官方文档虽然说了异步互斥锁有性能问题,其性能比通道来说还是马马虎虎的,起码比官方推荐的通道会快上接近一倍,在一些既需要异步等待,又需要加锁的类似于事务的操作中用异步锁也完全没问题。

async_channel性能略好于标准库的tokio::sync::mpsc::unbounded_channel,特别是在多协程的状态下,功能上也更好用,值得日常使用。

postage虽然功能上与async_channel相似,但性能却不太佳,可能是其没有不限容量通道,只能通过大缓存通道来初始化导致的。

后加的kanalflume表现一般,同步通道上flume性能和标准库一致,似乎不是无锁通道,异步上postagekanalflume三者半斤八两,整体性能都不如async-channel

源码直接贴帖子里,因为一个环境总共就测三四组数据,所以就没有抽象出功能来(抽象太费脑筋),大体上代码结构都是相似的。

[dependencies]
tokio = {version = "1.24.1", features = ["full"]}
futures = "0.3.25"
tokio-scoped = "0.2.0"
crossbeam-channel = "0.5.6"
postage = "0.5.0"
async-channel = "1.8.0"
kanal = "0.1.0-pre8"
flume = "0.10.14"
use std::thread;
use std::sync::{Barrier, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;

use tokio;
use tokio_scoped;

use crossbeam_channel;
use blockingqueue;
use delay_queue::{Delay, DelayQueue};
use postage::{self, prelude::Stream, sink::Sink};
use async_channel;


const N_THREADS: usize = 10;
const N_TIMES: usize = 100000;
const N_LOOP: usize = 10;

#[tokio::main(worker_threads = 10)]
async fn main() {
    print!("threads: {}, times: {}\n\n", N_THREADS, N_TIMES);
    
    
    single_thread();
    threads_atomic();
    threads_mutex();
    threads_channel();
    threads_crossbeam_channel();
    threads_kanal();
    threads_flume();
    tokio_mutex();
    tokio_unbounded_channel();
    tokio_async_channel();
    tokio_kanal();
    tokio_flume();
    tokio_postage();
}

fn single_thread() {
    print!("single-thread: ");

    for _ in 1..=N_LOOP {
        let start = Instant::now();
        let mut consumers = (0..N_THREADS).map(|_| Vec::new()).collect::<Vec<_>>();
        for i in 0..N_THREADS {
            for j in i * N_TIMES .. (i + 1) * N_TIMES {
                consumers[i].push(j);
            }
        }
        print!("{:?}, ", start.elapsed());
    }
}

fn threads_atomic() {
    print!("\nthreads-atomic: ");
    
    for _ in 1..=N_LOOP {
        let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
        let producer = AtomicUsize::new(0);
        let start = Instant::now();
        thread::scope(|s| {
            for i in 0..N_THREADS {
                let producer = &producer;
                let consumers = &consumers;
                s.spawn(move || {
                    let mut consumer = consumers[i].lock().unwrap();
                    for _ in 0..N_TIMES {
                        let j = producer.fetch_add(1, Ordering::Relaxed);
                        consumer.push(j);
                    }
                });
            }
        });
        print!("{:?}, ", start.elapsed());
    }
}

fn threads_mutex() {
    print!("\nthreads-mutex: ");
    
    for _ in 1..=N_LOOP {
        let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
        let producer = Mutex::new(0usize);
        let start = Instant::now();
        thread::scope(|s| {
            for i in 0..N_THREADS {
                let producer = &producer;
                let consumers = &consumers;
                s.spawn(move || {
                    let mut consumer = consumers[i].lock().unwrap();
                    for _ in 0..N_TIMES {
                        let mut j = producer.lock().unwrap();
                        consumer.push(*j);
                        *j += 1;
                    }
                });
            }
        });
        print!("{:?}, ", start.elapsed());
    }
}

fn threads_channel() {
    print!("\nthreads-channel: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, producer_receiver) = std::sync::mpsc::channel();
        let dispatcher = Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        thread::scope(|s| {
            for i in 0..N_THREADS {
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(move || {
                    let (tx, rx) = std::sync::mpsc::channel();
                    dispatcher.lock().unwrap()[i] = Some(tx);
                    barrier.wait();                    
                    let mut consumer = consumers[i].lock().unwrap();
                    for _ in 0..N_TIMES {
                        producer_sender.send(i).unwrap();
                        let j = rx.recv().unwrap();
                        consumer.push(j);
                    }
                });
            }
            barrier.wait();            
            let dispatcher = dispatcher.lock().unwrap().iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
            for j in 0..N_TIMES * N_THREADS {
                let i = producer_receiver.recv().unwrap();
                dispatcher[i].send(j).unwrap();
            }
        });
        print!("{:?}, ", start.elapsed());
    }
}

fn threads_crossbeam_channel() {
    print!("\nthreads-crossbeam-channel: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, producer_receiver) = crossbeam_channel::unbounded();
        let dispatcher = Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        thread::scope(|s| {
            for i in 0..N_THREADS {
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(move || {
                    let (tx, rx) = crossbeam_channel::unbounded();
                    dispatcher.lock().unwrap()[i] = Some(tx);
                    barrier.wait();                    
                    let mut consumer = consumers[i].lock().unwrap();
                    for _ in 0..N_TIMES {
                        producer_sender.send(i).unwrap();
                        let j = rx.recv().unwrap();
                        consumer.push(j);
                    }
                });
            }
            barrier.wait();            
            let dispatcher = dispatcher.lock().unwrap().iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
            for j in 0..N_TIMES * N_THREADS {
                let i = producer_receiver.recv().unwrap();
                dispatcher[i].send(j).unwrap();
            }
        });
        print!("{:?}, ", start.elapsed());
    }
}

fn threads_kanal() {
    print!("\nthreads-kanal: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, producer_receiver) = kanal::unbounded();
        let dispatcher = Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        thread::scope(|s| {
            for i in 0..N_THREADS {
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(move || {
                    let (tx, rx) = kanal::unbounded();
                    dispatcher.lock().unwrap()[i] = Some(tx);
                    barrier.wait();                    
                    let mut consumer = consumers[i].lock().unwrap();
                    for _ in 0..N_TIMES {
                        producer_sender.send(i).unwrap();
                        let j = rx.recv().unwrap();
                        consumer.push(j);
                    }
                });
            }
            barrier.wait();            
            let dispatcher = dispatcher.lock().unwrap().iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
            for j in 0..N_TIMES * N_THREADS {
                let i = producer_receiver.recv().unwrap();
                dispatcher[i].send(j).unwrap();
            }
        });
        print!("{:?}, ", start.elapsed());
    }
}

fn threads_flume() {
    print!("\nthreads-flume: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, producer_receiver) = flume::unbounded();
        let dispatcher = Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        thread::scope(|s| {
            for i in 0..N_THREADS {
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(move || {
                    let (tx, rx) = flume::unbounded();
                    dispatcher.lock().unwrap()[i] = Some(tx);
                    barrier.wait();                    
                    let mut consumer = consumers[i].lock().unwrap();
                    for _ in 0..N_TIMES {
                        producer_sender.send(i).unwrap();
                        let j = rx.recv().unwrap();
                        consumer.push(j);
                    }
                });
            }
            barrier.wait();            
            let dispatcher = dispatcher.lock().unwrap().iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
            for j in 0..N_TIMES * N_THREADS {
                let i = producer_receiver.recv().unwrap();
                dispatcher[i].send(j).unwrap();
            }
        });
        print!("{:?}, ", start.elapsed());
    }
}

fn tokio_mutex() {
    print!("\ntokio-mutex: ");
    
    for _ in 1..=N_LOOP {
        let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
        let producer = tokio::sync::Mutex::new(0usize);
        let start = Instant::now();
        tokio_scoped::scope(|s| {
            for i in 0..N_THREADS {
                let producer = &producer;
                let consumers = &consumers;
                s.spawn(async move {
                    let mut consumer = consumers[i].lock().await;
                    for _ in 0..N_TIMES {
                        let mut j = producer.lock().await;
                        consumer.push(*j);
                        *j += 1;
                    }
                });
            }
        });
        print!("{:?}, ", start.elapsed());
        // println!("consumers: {:?}", consumers);
    }
}

fn tokio_unbounded_channel() {
    print!("\ntokio-unbounded-channel: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, mut producer_receiver) = tokio::sync::mpsc::unbounded_channel();
        let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        tokio_scoped::scope(|s| {
            for i in 0..N_THREADS {
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(async move {
                    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
                    dispatcher.lock().await[i] = Some(tx);
                    barrier.wait().await;                    
                    let mut consumer = consumers[i].lock().await;
                    for _ in 0..N_TIMES {
                        producer_sender.send(i).unwrap();
                        let j = rx.recv().await.unwrap();
                        consumer.push(j);
                    }
                });
            }
            let dispatcher = &dispatcher;
            let barrier = &barrier;
            s.spawn(async move {
                barrier.wait().await;            
                let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
                for j in 0..N_TIMES * N_THREADS {
                    let i = producer_receiver.recv().await.unwrap();
                    dispatcher[i].send(j).unwrap();
                }
            });
        });
        print!("{:?}, ", start.elapsed());
        // println!("consumers: {:?}", consumers);
    }
}

fn tokio_async_channel() {
    print!("\ntokio-async-channel: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, producer_receiver) = async_channel::unbounded();
        let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        tokio_scoped::scope(|s| {
            for i in 0..N_THREADS {
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(async move {
                    let (tx, rx) = async_channel::unbounded();
                    dispatcher.lock().await[i] = Some(tx);
                    barrier.wait().await;                    
                    let mut consumer = consumers[i].lock().await;
                    for _ in 0..N_TIMES {
                        producer_sender.send(i).await.ok();
                        let j = rx.recv().await.unwrap();
                        consumer.push(j);
                    }
                });
            }
            let dispatcher = &dispatcher;
            let barrier = &barrier;
            s.spawn(async move {
                barrier.wait().await;            
                let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
                for j in 0..N_TIMES * N_THREADS {
                    let i = producer_receiver.recv().await.unwrap();
                    dispatcher[i].send(j).await.ok();
                }
            });
        });
        print!("{:?}, ", start.elapsed());
    }
}


fn tokio_kanal() {
    print!("\ntokio-kanal: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, producer_receiver) = kanal::unbounded_async();
        let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        tokio_scoped::scope(|s| {
            for i in 0..N_THREADS {
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(async move {
                    let (tx, rx) = kanal::unbounded_async();
                    dispatcher.lock().await[i] = Some(tx);
                    barrier.wait().await;                    
                    let mut consumer = consumers[i].lock().await;
                    for _ in 0..N_TIMES {
                        producer_sender.send(i).await.ok();
                        let j = rx.recv().await.unwrap();
                        consumer.push(j);
                    }
                });
            }
            let dispatcher = &dispatcher;
            let barrier = &barrier;
            s.spawn(async move {
                barrier.wait().await;            
                let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
                for j in 0..N_TIMES * N_THREADS {
                    let i = producer_receiver.recv().await.unwrap();
                    dispatcher[i].send(j).await.ok();
                }
            });
        });
        print!("{:?}, ", start.elapsed());
    }
}

fn tokio_flume() {
    print!("\ntokio-flume: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, producer_receiver) = flume::unbounded();
        let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        tokio_scoped::scope(|s| {
            for i in 0..N_THREADS {
                let producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(async move {
                    let (tx, rx) = flume::unbounded();
                    dispatcher.lock().await[i] = Some(tx);
                    barrier.wait().await;                    
                    let mut consumer = consumers[i].lock().await;
                    for _ in 0..N_TIMES {
                        producer_sender.send_async(i).await.ok();
                        let j = rx.recv_async().await.unwrap();
                        consumer.push(j);
                    }
                });
            }
            let dispatcher = &dispatcher;
            let barrier = &barrier;
            s.spawn(async move {
                barrier.wait().await;            
                let dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
                for j in 0..N_TIMES * N_THREADS {
                    let i = producer_receiver.recv_async().await.unwrap();
                    dispatcher[i].send_async(j).await.ok();
                }
            });
        });
        print!("{:?}, ", start.elapsed());
    }
}

fn tokio_postage() {
    print!("\ntokio-postage: ");
    
    for _ in 1..=N_LOOP {
        let (producer_sender, mut producer_receiver) = postage::mpsc::channel::<usize>(N_TIMES);
        let dispatcher = tokio::sync::Mutex::new(vec![None; N_THREADS]);
        let consumers = (0..N_THREADS).map(|_| tokio::sync::Mutex::new(Vec::new())).collect::<Vec<_>>();
        let barrier = tokio::sync::Barrier::new(N_THREADS + 1);
        let start = Instant::now();    
        tokio_scoped::scope(|s| {
            for i in 0..N_THREADS {
                let mut producer_sender = producer_sender.clone();
                let dispatcher = &dispatcher;
                let consumers = &consumers;
                let barrier = &barrier;
                s.spawn(async move {
                    let (tx, mut rx) = postage::mpsc::channel::<usize>(N_TIMES);
                    dispatcher.lock().await[i] = Some(tx);
                    barrier.wait().await;                    
                    let mut consumer = consumers[i].lock().await;
                    for _ in 0..N_TIMES {
                        producer_sender.send(i).await.ok();
                        let j = rx.recv().await.unwrap();
                        consumer.push(j);
                    }
                });
            }
            let dispatcher = &dispatcher;
            let barrier = &barrier;
            s.spawn(async move {
                barrier.wait().await;            
                let mut dispatcher = dispatcher.lock().await.iter().map(|some_tx| some_tx.clone().unwrap()).collect::<Vec<_>>();
                for j in 0..N_TIMES * N_THREADS {
                    let i = producer_receiver.recv().await.unwrap();
                    dispatcher[i].send(j).await.ok();
                }
            });
        });
        print!("{:?}, ", start.elapsed());
    }
}

评论区

写评论
作者 lithbitren 2023-01-28 22:55

咦?我刚刚把rust版本升级到rustc 1.69.0-nightly (d7948c843 2023-01-26)了,测出来结果没变啊,crossbeam进标准库难道是只有stable可以用吗?

作者 lithbitren 2023-01-28 22:44

情理之中,一直觉得crossbeam对rust的渗透很严重,像scope这样的语句貌似也是crossbeam先推广开的,然后去年才进标准库的。

这次的测试版本用的是rustc 1.68.0-nightly (270c94e48 2022-12-28)

--
👇
eweca-d: 1.67,crossbeam进入std库了

作者 lithbitren 2023-01-28 22:37

测出来了,kanalflume同步异步通道加了4组,flume同步通道性能很差,两者异步通道性能也一般,不如crossbeam-channal推荐的async-channal

--
👇
BobAnkh: 做测量挺不错的,之前看到过有人测过,也供你参考:https://github.com/fereidani/rust-channel-benchmarks

不妨借鉴一下,也可以在你这里测测kanal和flume的性能,我也挺好奇的

eweca-d 2023-01-27 10:28

1.67,crossbeam进入std库了

BobAnkh 2023-01-26 12:31

做测量挺不错的,之前看到过有人测过,也供你参考:https://github.com/fereidani/rust-channel-benchmarks

不妨借鉴一下,也可以在你这里测测kanal和flume的性能,我也挺好奇的

作者 lithbitren 2023-01-26 00:23

嘿嘿确实挺糙的,不过春节都懒得开电脑了,改天有时间把flume加进去吧。

crossbeam的benchmark没找到,flume的benchmark看上去项目挺全的,不过没找到高并发的代码,大多是少量线程不同功能下的测试。

我这的测试灵感来自于tokio的mini-redis,发号器内核简单改成hashmap就是个并发容器了,flume那种全项跨语言的测法可能没啥时间搞了。

--
👇
包布丁: 测 channel 怎么能少得了 flume 呢。还有这个 benchmark 方法有点太糙了,建议看看 flume 和 crossbeam 是怎么测性能的。

包布丁 2023-01-25 20:46

测 channel 怎么能少得了 flume 呢。还有这个 benchmark 方法有点太糙了,建议看看 flume 和 crossbeam 是怎么测性能的。

作者 lithbitren 2023-01-25 12:48

还有,关于这篇文章为什么要加上多线程通道的对比。

在各个语言的协程时代以及redis单线程多路io复用时代来临之前,我们是怎么建立分布式数据容器和实现同步的。

就我所知,大多一开始还是用自己语言的同步锁,后来升级成读写锁,再后来为了操作的有序性用上通道,没有通道的语言就用条件通知锁加双端队列模拟通道。

但主流还是锁,就像这篇文章测出来的一样,连rust的线程锁都比线程通道快上近两个数量级,更别说其他gc语言或动态语言了。

但为什么进去协程时代就推荐通道了呢,原来是异步锁性能下降了,异步通道性能提升了,两者进入了同一个数量级,但仍不像某些文章说异步运行时下通道比锁快。

但并不能为此否定异步通道,异步通道比锁的适用性更好,几乎可以把所有单线程容器直接移植进来,无需考虑其他并发因素。

比如lockfree的并发hashmap很多,但数据淘汰里的并发lru/lfu就很少,屏蔽词检测替换里的并发ac自动机就很少。当然,上述情景也可以通过加锁完成,比如ac自动机我就是用arc_swap的load加的读锁,比标准库锁更快,更适用于只读不写的场景,屏蔽词更新的时候再store整体修改,并不要求像hashmap那样局部按key修改。

--
👇
danclive: 你的好几篇文章我都看过了,忍不住说几句,不同的锁,使用场景本身就不同,有些适合轻载,有些适合重载。比如说“Mutex”适合大部分情况;“RwLock”适合读远大于写的情况;自旋锁用在竞争非常少的情况,乐观时能防止线程被切换。不同的“Mutex”实现也有其自身的考虑。std::thread::park,std::hint::spin_loop,std::thread::yield_now 在文档中也有说明,你就单纯的用for循环比较他们的性能?

作者 lithbitren 2023-01-25 12:12

不管是标准库还是第三方库,没有多少人具体测出过各个锁的性能,所以就测了一下。

本来就是官方互斥锁用的最多,park/spin_lock/yield_now大多数普通人几乎用不到,标准库往往只说了场景,但很少提及性能。

比如说park/spn_lock虽然很直观,但我实在也找不到什么场景能够超越barrie锁或条件锁。

比如你说rwlock比mutex适合读远大于写的情况,这个远大于到底是10:1还是100:1或更多,我这测出读写大约10:1左右就可以上读写锁了,小于十比一时读写锁远不如互斥锁,问题是不测怎么知道,难道读文档就能知道所有了吗?

再比如这篇文章开头写的那样,tokio官方文档关于锁的说明,实测结果和官方说的并不完全相符,可能跟各种版本环境有关,但不测怎么知道,照本宣科有啥意思?

至于是不是用for循环来测,我认为循环只是放大加锁负担的工具,原则上基本上测的大多数百万并发下各种锁的占用的时间,可以让人对系统吞吐量和压力有一个基本的预估,如果你有更好的测试方法当然欢迎交流讨论。

--
👇
danclive: 你的好几篇文章我都看过了,忍不住说几句,不同的锁,使用场景本身就不同,有些适合轻载,有些适合重载。比如说“Mutex”适合大部分情况;“RwLock”适合读远大于写的情况;自旋锁用在竞争非常少的情况,乐观时能防止线程被切换。不同的“Mutex”实现也有其自身的考虑。std::thread::park,std::hint::spin_loop,std::thread::yield_now 在文档中也有说明,你就单纯的用for循环比较他们的性能?

danclive 2023-01-24 15:33

你的好几篇文章我都看过了,忍不住说几句,不同的锁,使用场景本身就不同,有些适合轻载,有些适合重载。比如说“Mutex”适合大部分情况;“RwLock”适合读远大于写的情况;自旋锁用在竞争非常少的情况,乐观时能防止线程被切换。不同的“Mutex”实现也有其自身的考虑。std::thread::park,std::hint::spin_loop,std::thread::yield_now 在文档中也有说明,你就单纯的用for循环比较他们的性能?

sjbwylbs 2023-01-24 12:20

好勤劳,凌晨3:15分写完的。Mark一下!

1 共 11 条评论, 1 页