< 返回版块

lithbitren 发表于 2023-01-10 00:57

Tags:tokio,mutex,async

本来是打算测试并发哈希表的,就先试了普通的mutex包裹在threadtokio下性能差异。

tokio下自然用的是异步的tokio::sync::Mutex,结果发现性能比起thread下的互斥锁差非常多,不管是10个线程协程还是1000个线程协程性能都不行。

那干脆就先测测异步锁的性能把。

理论上在tokio的运行时里是不推荐用同步的std::sync::Mutex,但是测了测表现却很好。

不过同步std::sync::Mutex终究不是一个合适的选择,于是我上crates.io上查找了下载次数比较多的异步锁,cargo.toml依赖如下:

[dependencies]
tokio = {version = "1.24.1", features = ["full"]}
futures = "0.3.25"
async-mutex = "1.4.0"
async-lock = "2.6.0"
fast-async-mutex = "0.6.7"

然后写了个测试程序,并发一百万次Vec::push(usize),分别测试了10个线程/协程100000次操作和1000个线程/协程1000次操作的对比,重复运行10次。

其中scope-std-mutex是代表thread::scope下使用std::sync::Mutex的并发操作,tokio-std-mutex则代表tokiostd::sync::Mutextokio-mutex则就是用自带的互斥锁tokio::sync::Mutex,后面三个则是找到的第三方写的异步锁。

threads: 10, times: 100000, single thread: 4.3275ms

thread-std-mutex: 29.7466ms, 24.0701ms, 23.6063ms, 26.3388ms, 25.6356ms, 25.478ms, 25.2364ms, 26.7211ms, 27.4124ms, 28.9356ms, 
scope-std-mutex: 25.9718ms, 26.8308ms, 26.244ms, 24.9638ms, 27.0795ms, 25.9664ms, 28.3934ms, 25.922ms, 24.2884ms, 25.5083ms, 
tokio-std-mutex: 28.0744ms, 31.5944ms, 28.6856ms, 29.2385ms, 31.3562ms, 22.8386ms, 29.9602ms, 29.3549ms, 29.098ms, 22.945ms, 
tokio-mutex: 191.6492ms, 185.5199ms, 186.864ms, 188.0478ms, 187.5107ms, 187.2095ms, 186.9775ms, 205.9339ms, 193.0365ms, 195.6984ms, 
async-mutex: 23.5128ms, 21.5624ms, 23.4476ms, 22.4993ms, 21.4914ms, 21.4246ms, 21.5033ms, 21.495ms, 21.3756ms, 21.1966ms, 
async-lock: 21.8512ms, 21.1139ms, 22.1052ms, 21.4546ms, 21.9403ms, 20.9468ms, 21.1317ms, 21.543ms, 21.6153ms, 21.0242ms, 
fast-async-mutex: 112.6995ms, 117.5415ms, 142.6832ms, 113.5288ms, 121.1797ms, 113.1455ms, 120.0788ms, 100.8301ms, 120.296ms, 128.6366ms, 
parking-lot: 72.285ms, 66.5004ms, 67.1994ms, 70.7451ms, 69.1513ms, 64.9047ms, 68.6076ms, 68.718ms, 72.0517ms, 69.1927ms, 
threads: 1000, times: 1000, single thread: 4.4056ms

thread-std-mutex: 60.8151ms, 52.8336ms, 49.8579ms, 48.2702ms, 48.2183ms, 47.5837ms, 47.5442ms, 47.7192ms, 48.9598ms, 48.7215ms, 
scope-std-mutex: 46.1217ms, 45.827ms, 45.4512ms, 45.2878ms, 45.8804ms, 45.2085ms, 47.5761ms, 45.4836ms, 44.1086ms, 43.5693ms, 
tokio-std-mutex: 31.4038ms, 34.3193ms, 32.1812ms, 26.7186ms, 40.3314ms, 27.3657ms, 35.5608ms, 27.6058ms, 27.4601ms, 35.1832ms, 
tokio-mutex: 192.4593ms, 187.2106ms, 190.6008ms, 189.7324ms, 187.8145ms, 188.5094ms, 187.1712ms, 192.2489ms, 213.6596ms, 192.5352ms, 
async-mutex: 391.3653ms, 382.5787ms, 373.099ms, 379.5068ms, 385.3628ms, 381.0376ms, 385.1471ms, 382.0066ms, 395.1736ms, 405.4637ms, 
async-lock: 400.566ms, 405.8318ms, 391.4916ms, 391.8056ms, 385.566ms, 383.4072ms, 384.6201ms, 379.608ms, 385.2667ms, 394.9906ms, 
fast-async-mutex: 105.6818ms, 92.5969ms, 125.9783ms, 121.0262ms, 121.8443ms, 99.8092ms, 98.8561ms, 127.0074ms, 125.2284ms, 93.9906ms, 
parking-lot: 73.6634ms, 73.2848ms, 70.5412ms, 70.3002ms, 70.6306ms, 73.9181ms, 70.2321ms, 71.6969ms, 75.0105ms, 71.7061ms, 

单线程百万次操作Vec::push大概只要4ms+,其他测试多出来的时间就是线程/协程切换以及抢锁的性能损耗啦。

首先作为对比基准的传统thread::spawnScope::spawn,不管是10线程还是1000线程两者表现基本一致,scope的表现略好一点点,但不明显,而且起码在开1000线程的情况下,时间消耗也没有想象中的大。

然后再看看tokio运行时的情况,用同步锁的std::sync::Mutex性能比tokio::sync::Mutex性能好了5-6倍,并且随着协程数量增加,抢锁性能损失似乎都不严重。

std::sync::Mutex是同步锁,锁住的是当前线程,也会把整个线程下的所有协程都锁住。但如果在锁定的过程中不涉及长时间的异步操作的话,似乎同步的Mutex也是一个选择。

而后面的第三方库,async-mutexasync-lock在协程数较少的时候表现还不错,但协程数量一旦加大性能就急剧下降。然而协程专属的场景大多属于高并发场景,1000协程并发甚至都是负载比较轻的情况。

fast-async-mutex倒是中规中矩,但加锁性能还是不如线程的同步锁,不过总体上看也算是一个相对优先的选择。

tokio官方文档推荐的parking-lot也测了,虽然表现还行,但毕竟也是同步锁。

协程并发在IO性能上比线程要强很多,但异步锁的性能确实都不咋地,如果高频使用的话搞不好反而成为性能拖后腿的那一环。

几个并发互斥锁的源码概略看了看,基本还是原子操作参与循环的那一套,如果要优化的话,个人觉得可能需要从协程的更底层优化才行,被锁的协程单独拿出来做事件循环,否则跟着所有事件做循环感觉性能瓶颈还是不小。当然,只是说说而已,反正我是写是写不出的。

不知道还有没有更好的第三方库,像无锁并发哈希表已经卷到极致了,性能比不知道比std::sync::RwLock<HashMap<K, V>>强多少倍去。

测试代码放在最后了,没啥好注释说明的,核心语句不是就是v.lock().unwrap().push(j);就是v.lock().await.push(j);

use std::thread;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use futures::future;

use tokio;
use async_mutex;
use async_lock;
use fast_async_mutex;
use parking_lot;


const N_THREADS: usize = 1000;
const N_TIMES: usize = 1000;
const N_LOOP: usize = 10;

#[tokio::main(worker_threads = 10)]
async fn main() {

    print!("threads: {}, times: {}, ", N_THREADS, N_TIMES);

    print!("single thread: ");

    let start = Instant::now();

    let mut v = Vec::new();

    for i in 0..N_THREADS {
        for j in i * N_TIMES .. (i + 1) * N_TIMES {
            v.push(j);
        }
    }

    print!("{:?}\n", start.elapsed());
    
    print!("\nthread-std-mutex: ");

    for _ in 1..=N_LOOP {

        let v = Arc::new(Mutex::new(Vec::new()));
    
        let start = Instant::now();
    
        (0..N_THREADS).map(|i| {
            let v = v.clone();
            thread::spawn(move || {
                for j in i * N_TIMES .. (i + 1) * N_TIMES {
                    v.lock().unwrap().push(j);
                }
            })
        }).collect::<Vec<_>>()
        .into_iter()
        .for_each(|handle| handle.join().unwrap());

        print!("{:?}, ", start.elapsed());
    }
    
    print!("\nscope-std-mutex: ");
    
    for _ in 1..=N_LOOP {

        let v = Mutex::new(Vec::new());
    
        let start = Instant::now();
    
        thread::scope(|s| {
            for i in 0..N_THREADS {
                let v = &v;
                s.spawn(move || {
                    for j in i * N_TIMES .. (i + 1) * N_TIMES {
                        v.lock().unwrap().push(j);
                    }
                });
            }
        });

        print!("{:?}, ", start.elapsed());
    }
    
    print!("\ntokio-std-mutex: ");

    for _ in 1..=N_LOOP {

        let v = Arc::new(Mutex::new(Vec::new()));
    
        let start = tokio::time::Instant::now();
        
        future::join_all((0..N_THREADS).map(|i| {
            let v = v.clone();
            tokio::spawn( async move {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    v.lock().unwrap().push(j);
                }
            })
        })).await;

        print!("{:?}, ", start.elapsed());
    }

    print!("\ntokio-mutex: ");

    for _ in 1..=N_LOOP {

        let v = Arc::new(tokio::sync::Mutex::new(Vec::new()));
    
        let start = tokio::time::Instant::now();
        
        future::join_all((0..N_THREADS).map(|i| {
            let v = v.clone();
            tokio::spawn( async move {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    v.lock().await.push(j);
                }
            })
        })).await;

        print!("{:?}, ", start.elapsed());
    }

    print!("\nasync-mutex: ");

    for _ in 1..=N_LOOP {

        let v = Arc::new(async_mutex::Mutex::new(Vec::new()));
    
        let start = tokio::time::Instant::now();
        
        future::join_all((0..N_THREADS).map(|i| {
            let v = v.clone();
            tokio::spawn( async move {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    v.lock().await.push(j);
                }
            })
        })).await;

        print!("{:?}, ", start.elapsed());
    }

    print!("\nasync-lock: ");

    for _ in 1..=N_LOOP {

        let v = Arc::new(async_lock::Mutex::new(Vec::new()));
    
        let start = tokio::time::Instant::now();
        
        future::join_all((0..N_THREADS).map(|i| {
            let v = v.clone();
            tokio::spawn( async move {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    v.lock().await.push(j);
                }
            })
        })).await;

        print!("{:?}, ", start.elapsed());
    }
    
    print!("\nfast-async-mutex: ");

    for _ in 1..=N_LOOP {

        let v = Arc::new(fast_async_mutex::mutex::Mutex::new(Vec::new()));
    
        let start = tokio::time::Instant::now();
        
        future::join_all((0..N_THREADS).map(|i| {
            let v = v.clone();
            tokio::spawn( async move {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    v.lock().await.push(j);
                }
            })
        })).await;

        print!("{:?}, ", start.elapsed());
    }

    print!("\nparking-lot: ");

    for _ in 1..=N_LOOP {

        let v = Arc::new(parking_lot::Mutex::new(Vec::new()));
    
        let start = tokio::time::Instant::now();
        
        future::join_all((0..N_THREADS).map(|i| {
            let v = v.clone();
            tokio::spawn( async move {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    v.lock().push(j);
                }
            })
        })).await;

        print!("{:?}, ", start.elapsed());
    }

}

评论区

写评论
JasonkayZK 2023-01-11 20:48

tokio 的官方文档不建议使用 mutex,性能不高。

作者 lithbitren 2023-01-10 06:22

也不算太详细,只是说了tokio::sync::Mutex内部也是互斥锁,性能不太好。

也提到了如果不互斥的情况下推荐用std::sync::Mutex,但也是有前提的,就是在设计系统的时候不能出现持有锁时发生异步等待,所有我觉得终极解决方案还是得挖掘出性能更好的异步锁。

不过上次看教程已经是太久之前了,后来基本只看api了。当年看文档的时候确实埋下了一个种子,总有印象说tokio::sync::Mutex性能差,但又找不到出处,现在才发现原来是在文档里。

加测了文档里推荐的parking_lot::Mutex,说是比标准库强,但我这里测出的性能还是不如标准库的Mutex

至于文档里面推荐用通道来处理共享数据,通道对状态控制更有序,改天有时间也可以测测性能。

--
👇
7sDream: Tokio Mutex 的文档很详细的描述了为什么 Async 的 Mutex 性能反而会差,到底什么时候才需要用到 Async 的 Mutex,以及比用锁更好的方案。

7sDream 2023-01-10 01:49

Tokio Mutex 的文档很详细的描述了为什么 Async 的 Mutex 性能反而会差,到底什么时候才需要用到 Async 的 Mutex,以及比用锁更好的方案。

1 共 3 条评论, 1 页