< 返回版块

lithbitren 发表于 2023-02-11 21:30

Tags:Mutex,stack,queue

队列和栈这两个数据结构基本刚学编程的时候就会接触到,实际业务偶尔也会用到并发的队列和栈,所以这里简单测试一下rust并发栈/队列压栈出栈/进队出队的性能。

测试思路也很简单,首先一个正常设计的系统,push和pop的操作次数都是基本平衡的,所以这里就会测试总操作在一百万对的情况下,分别在1,10,1000个的多协程竞争下的性能结果,所用时间越小越好。

测试用了最基础的Mutex<Vec>和标准库Mutex<VecDeque>模拟并发队列和并发栈作为比较基准,在crates.io找了一些第三方库出来,其中npncpi_lfstack性能实在太差(并发下比平均水平差两个数量级),其他都放了进来,还放了个crossbeam-channel进来,try_recv方法和队列功能基本一致。其他一些库有些太久没更新,新版本用不了,还有些跨作用域/跨线程复制太麻烦的就不放进来测了。并发测试环境是tokio的协程运行时,用第三方库tokio-scoped来减少代码复杂度。

第三方库依赖如下,如果还有更好用的第三方库,也可以推荐进来加测:

[dependencies]
tokio = {version = "1.24.1", features = ["full"]}
tokio-scoped = "0.2.0"

persistent_stack = "0.1.1"
pi_lfstack = "0.1.5"
coco = "0.3.4"
npnc = "0.2.1"
crossbeam-channel = "0.5.6"
crossbeam-queue = "0.3.8"
atomic-queue = "1.0.1"
lf-queue = "0.1.0"
concurrent-queue = "2.1.0"

测试结果如下:

threads: 1, times: 1000000

vec_stack: 26.3993ms, 25.3901ms, 25.4435ms, 25.4212ms, 25.4001ms, 25.1032ms, 26.1314ms, 24.858ms, 24.9335ms, 24.859ms,
vecdeque_stack: 27.064ms, 26.361ms, 26.5495ms, 26.3638ms, 26.3559ms, 26.449ms, 30.3709ms, 26.5355ms, 26.4878ms, 26.3116ms,
vecdeque_queue: 28.6913ms, 28.9273ms, 28.0277ms, 28.0844ms, 28.0673ms, 28.1431ms, 27.947ms, 28.7753ms, 28.0509ms, 28.0771ms,
persistent_stack: 78.7973ms, 77.7623ms, 77.9925ms, 77.7967ms, 78.4499ms, 77.6113ms, 77.2484ms, 77.9266ms, 77.779ms, 77.6751ms,
coco_stack: 92.8972ms, 91.0536ms, 91.6069ms, 90.7764ms, 91.0812ms, 91.1381ms, 90.7195ms, 90.491ms, 91.5574ms, 90.6397ms,
crossbeam_channel: 43.8011ms, 42.9642ms, 43.1193ms, 42.7666ms, 42.99ms, 42.7819ms, 43.0606ms, 43.3035ms, 43.0436ms, 42.8413ms,
crossbeam_queue: 38.2909ms, 37.3756ms, 37.5468ms, 37.1467ms, 38.0162ms, 37.6626ms, 37.4788ms, 37.3612ms, 37.2424ms, 37.2686ms,
atomic_queue: 34.3409ms, 33.9337ms, 34.0429ms, 33.9318ms, 34.0231ms, 34.3294ms, 33.9346ms, 34.6922ms, 34.6701ms, 33.5223ms,
lf_queue: 54.3681ms, 54.2108ms, 54.016ms, 54.3254ms, 54.2861ms, 54.6399ms, 53.9623ms, 54.2249ms, 54.8774ms, 54.6883ms,
concurrent_queue: 33.2541ms, 32.1233ms, 32.2522ms, 32.2959ms, 32.1086ms, 32.2572ms, 32.3255ms, 32.5518ms, 32.3405ms, 32.2298ms,
threads: 10, times: 100000

vec_stack: 51.8115ms, 52.9197ms, 54.0892ms, 52.8088ms, 50.9368ms, 52.2409ms, 53.1413ms, 52.6919ms, 52.709ms, 55.3961ms,
vecdeque_stack: 54.4545ms, 52.6322ms, 49.6938ms, 50.4971ms, 48.9824ms, 52.2588ms, 51.6985ms, 49.9326ms, 52.9505ms, 54.2455ms,
vecdeque_queue: 45.8304ms, 44.0043ms, 47.7825ms, 48.0708ms, 47.4822ms, 49.7141ms, 47.5263ms, 50.5276ms, 51.705ms, 43.5783ms,
persistent_stack: 89.5652ms, 88.8677ms, 84.6835ms, 82.454ms, 80.9895ms, 82.9408ms, 83.1169ms, 84.0594ms, 90.5619ms, 89.8251ms,
coco_stack: 176.1819ms, 176.6716ms, 175.3576ms, 176.2525ms, 175.5188ms, 176.5154ms, 175.198ms, 173.8ms, 175.8662ms, 174.6426ms,
crossbeam_channel: 50.9162ms, 50.3408ms, 50.1805ms, 50.9109ms, 50.4509ms, 49.7222ms, 51.0724ms, 48.7733ms, 50.4934ms, 50.0114ms,
crossbeam_queue: 43.6269ms, 42.8124ms, 42.2399ms, 43.3345ms, 42.7759ms, 42.6948ms, 42.9231ms, 43.0668ms, 41.8236ms, 43.7392ms,
atomic_queue: 171.7847ms, 170.297ms, 168.8615ms, 169.3591ms, 170.3869ms, 168.555ms, 170.5664ms, 170.615ms, 171.4737ms, 168.9004ms,
lf_queue: 136.869ms, 136.8505ms, 137.5492ms, 137.8592ms, 137.8282ms, 136.2304ms, 137.2081ms, 137.2544ms, 136.9489ms, 137.3581ms,
concurrent_queue: 177.9012ms, 179.4108ms, 179.7128ms, 181.0077ms, 178.6454ms, 176.9997ms, 177.1672ms, 177.5025ms, 177.3282ms, 181.246ms,
threads: 1000, times: 1000

vec_stack: 53.5574ms, 55.1064ms, 53.8803ms, 54.4086ms, 53.3398ms, 53.7337ms, 53.2006ms, 55.9423ms, 54.6538ms, 54.7703ms,
vecdeque_stack: 54.3735ms, 54.9519ms, 57.3253ms, 55.7271ms, 53.1167ms, 54.3509ms, 56.422ms, 53.5825ms, 53.2009ms, 54.2069ms,
vecdeque_queue: 51.9097ms, 51.1251ms, 53.0445ms, 52.3853ms, 49.9047ms, 51.4758ms, 55.1786ms, 53.3408ms, 51.5187ms, 55.7044ms,
persistent_stack: 80.4902ms, 80.341ms, 85.7919ms, 87.0566ms, 85.565ms, 84.3154ms, 85.3937ms, 85.4673ms, 80.6873ms, 85.3276ms,
coco_stack: 168.165ms, 167.1092ms, 167.7398ms, 166.7269ms, 168.1448ms, 169.7243ms, 168.3363ms, 166.8481ms, 168.5317ms, 168.2039ms,
crossbeam_channel: 52.2114ms, 52.0444ms, 52.124ms, 52.2133ms, 51.9927ms, 52.0888ms, 51.6871ms, 52.0501ms, 51.923ms, 51.9051ms,
crossbeam_queue: 46.0163ms, 45.9401ms, 45.8643ms, 45.6146ms, 45.9925ms, 45.8089ms, 46.043ms, 45.6665ms, 45.888ms, 45.7463ms,
atomic_queue: 187.0409ms, 189.4764ms, 188.7427ms, 185.8961ms, 187.3553ms, 187.3341ms, 187.6605ms, 185.5053ms, 187.3995ms, 186.2443ms,
lf_queue: 136.771ms, 136.847ms, 137.5104ms, 136.5015ms, 136.0706ms, 138.0645ms, 137.5464ms, 136.825ms, 137.8332ms, 136.5122ms,
concurrent_queue: 188.9455ms, 189.2176ms, 188.5629ms, 188.239ms, 187.9938ms, 187.9989ms, 188.3855ms, 188.3336ms, 190.1003ms, 188.6938ms,

可以看出,单线程的情况下,原生Mutex<Vec>Mutex<VecDeque>性能必然是最强的,多线程的情况下,大多数所谓的无锁并发数据结构都没有更好的表现,只有crossbeam_queue的队列在多线程的情况下比原生加锁稍快10%这样。

所以结论就是,绝大多数场景基本可以用原生数据结构加锁来实现并发栈或并发队列,在性能要求不极端的情况下甚至没必要引入crossbeam,也可能也正是因为lockfree思想对栈和队列性能提升并不显著,crossbeam几年前开了stack的项目然后直接鸽了。

源码如下,用宏写了复用逻辑:

use std::sync::Mutex;
use std::time::Instant;

use tokio;
use tokio_scoped;

const N_THREADS: usize = 10;
const N_TIMES: usize = 100000;
const N_LOOP: usize = 10;
// 本来不打算设置初始大小的,但某些第三方库不知道什么原因0大小性能很好,初始大小大于0就性能大幅下降,再大点就直接栈溢出,所以这折衷设置了初始大小为1
const INIT_SIZE: usize = 1; 

macro_rules! run {
    (
        $name: literal,
        $new: stmt,
        $copy: stmt,
        $push: stmt,
        $pop: stmt
    ) => {
        print!("\n{}: ", $name);
        for _ in 0..N_LOOP {
            // 定义数据结构,如:let v = Mutex::new(Vec::new());
            $new
            // 初始化大小
            for _ in 0..INIT_SIZE {
                $push
            }
            let start = Instant::now();
            tokio_scoped::scope(|s| {
                for _ in 0..N_THREADS {
                    // 把引用复制进作用域,如:let v = &v;
                    $copy
                    s.spawn(async move {
                        for _ in 0..N_TIMES {
                            // 每个循环进行一次推取,用作用域隔开
                            { $push } // { v.lock().unwrap().push(0); }
                            { $pop }; // { v.lock().unwrap().pop; };
                        }
                    });
                }
            });
            print!("{:?}, ", start.elapsed());
        }
    }
}

#[tokio::main(worker_threads = 10)]
async fn main() {
    print!("threads: {}, times: {}\n", N_THREADS, N_TIMES);

    run! {
        "vec_stack",
        let v = Mutex::new(Vec::new()),
        let v = &v,
        v.lock().unwrap().push(0),
        v.lock().unwrap().pop().unwrap()
    }

    run! {
        "vecdeque_stack",
        let v = Mutex::new(std::collections::VecDeque::new()),
        let v = &v,
        v.lock().unwrap().push_back(0),
        v.lock().unwrap().pop_back().unwrap()
    }

    run! {
        "vecdeque_queue",
        let v = Mutex::new(std::collections::VecDeque::new()),
        let v = &v,
        v.lock().unwrap().push_back(0),
        v.lock().unwrap().pop_front().unwrap()
    }

    use persistent_stack;
    run! {
        "persistent_stack",
        let mut v = persistent_stack::PersistentStack::new(),
        let mut v = v.clone(),
        v.push(0),
        v.pop().unwrap()
    }

    // use pi_lfstack;
    // run! {
    //     "pi_lfstack",
    //     let v = pi_lfstack::LFStack::new(),
    //     let v = &v,
    //     { v.push(0) },
    //     { v.pop().unwrap() }
    // }

    use coco;
    run! {
        "coco_stack",
        let v = coco::Stack::new(),
        let v = &v,
        v.push(0),
        v.pop().unwrap()
    }

    // use npnc;
    // run! {
    //     "npnc-channel",
    //     let (p, c) = npnc::unbounded::mpmc::channel(N_TIMES),
    //     let (p, c) = (p.clone(), c.clone()),
    //     { p.produce(0).unwrap() },
    //     { c.consume().unwrap() }
    // }

    use crossbeam_channel;
    run! {
        "crossbeam_channel",
        let (s, r) = crossbeam_channel::unbounded(),
        let (s, r) = (&s, &r),
        s.send(0).unwrap(),
        r.try_recv().unwrap()
    }

    use crossbeam_queue;
    run! {
        "crossbeam_queue",
        let v = crossbeam_queue::SegQueue::new(),
        let v = &v,
        v.push(0),
        v.pop().unwrap()
    }

    use atomic_queue;
    run! {
        "atomic_queue",
        let v = atomic_queue::bounded(N_TIMES),
        let v = &v,
        { v.push(0); },
        v.pop().unwrap()
    }

    use lf_queue;
    run! {
        "lf_queue",
        let v = lf_queue::Queue::new(),
        let v = &v,
        v.push(0),
        v.pop().unwrap()
    }

    use concurrent_queue;
    run! {
        "concurrent_queue",
        let v = concurrent_queue::ConcurrentQueue::unbounded(),
        let v = &v,
        v.push(0).unwrap(),
        v.pop().unwrap()
    }

}

评论区

写评论

还没有评论

1 共 0 条评论, 1 页