< 返回版块

kcrazy 发表于 2021-09-17 11:25

用 go 写的类似的代码,平均只需要1微秒,但用 rust+tokio 却平均要3微秒,如何优化能达到 go的水平?

use std::{thread, time::Duration};
use std::time::SystemTime;
use std::sync::atomic::{AtomicI32, Ordering};
use std::ops::{Div, AddAssign};
use std::sync::{Arc};

use tokio::runtime::{Builder};
use tokio::sync::mpsc;
use tokio::sync::{Mutex};


#[derive(Debug)]
struct Event {
    _type: String,
    time: SystemTime,
}

fn main() {
    let (tx, mut rx) = mpsc::unbounded_channel::<Event>();

    let rt = Builder::new_multi_thread().worker_threads(8)
        .build()
        .unwrap();

    let count: Arc<AtomicI32> = Arc::new(AtomicI32::new(0));
    let total: Arc<Mutex<Duration>> = Arc::new(Mutex::new(Duration::new(0, 0)));

    let count2 = count.clone();
    let total2 = total.clone();
    rt.spawn(async move {
        while let Some(res) = rx.recv().await {
            let elapsed = res.time.elapsed().ok().unwrap();
            count2.fetch_add(1, Ordering::Relaxed);
            total2.lock().await.add_assign(elapsed);
        }
    });

    let max_loop = 100000;

    for _ in 0..max_loop {
        let event = Event {
            _type: "hello".to_string(),
            time: SystemTime::now(),
        };
        let _ = tx.send(event);

        thread::sleep(Duration::from_micros(1));
    }

    thread::sleep(Duration::from_millis(4000));

    println!("{:?}", count.load(Ordering::Relaxed));
    rt.block_on(async move {
        let total2 = total.lock().await;
        println!(
            "total: {:?}, average: {:?}, {:?}/s",
            total2,
            total2.div(count.load(Ordering::Relaxed) as u32),
            count.load(Ordering::Relaxed) as f64 / total2.as_secs_f64()
        );
    });
}

go 的代码,average 输出是 1.351µs

package main

import (
	"fmt"
	"reflect"
	"sync/atomic"
	"time"
)

var count int32
var total int32

type event struct {
	args    []interface{}
	event   chan struct{}
	handler interface{}
}

func onEvent(sig chan struct{}, t time.Time) {
	d := time.Since(t)
	atomic.AddInt32(&count, 1)
	atomic.AddInt32(&total, int32(d))

	sig <- struct{}{}
}

func main() {

	queue := make(chan event, 200)
	defer close(queue)

	go func(q chan event) {
		// 循环读取事件
		for ev := range q {
			v := reflect.ValueOf(ev.handler)
			if v.Kind() != reflect.Func {
				panic("not a function")
			}

			vArgs := make([]reflect.Value, len(ev.args))
			for i, arg := range ev.args {
				vArgs[i] = reflect.ValueOf(arg)
			}
			v.Call(vArgs)
		}
	}(queue)

	const COUNT = 100000

	var sig = make(chan struct{}, COUNT)

	for i := 0; i < COUNT; i++ {
		// 发事件
		func(args ...interface{}) {
			var ev = event{args: args, handler: onEvent}
			queue <- ev
		}(sig, time.Now())

		time.Sleep(time.Microsecond)
	}

	for i := 0; i < COUNT; i++ {
		<-sig
	}

	fmt.Println("total:", total, "average:", time.Duration(total)/time.Duration(count), count)
}

测试环境:I9-9900k 32G DDR4

评论区

写评论
zhuxiujia 2021-09-27 14:49

结论描述错误,线程模型下rust(0.193us)比go(1.903µs)快10倍

👇
zhuxiujia: tokio不代表rust弱,csp模型抢占比较积极,计算能力稍高。要公平的话, 这里rust换线程模型试验一下。

go: total: 190360931 average: 1.903µs 100000 rust: total: 19.34ms, average: 193ns, 5170630.816959669/s

结论,rust表现和go基本一致(193纳秒约等于 0.193us)

代码:

use std::sync::atomic::{AtomicI32, Ordering, AtomicU64};
use std::time::{SystemTime, Duration};
use std::sync::{Arc, Mutex};
use std::thread;
use std::ops::{Div, AddAssign};

#[derive(Debug)]
struct Event {
   pub _type: String,
   pub time: SystemTime,
}

fn main() {
    let count=Arc::new(AtomicU64::new(0));
    let total: Arc<Mutex<Duration>> = Arc::new(Mutex::new(Duration::new(0, 0)));
    let (s,r)= crossbeam_channel::bounded::<Event>(200);
    let count2 = count.clone();
    let total2 = total.clone();
    thread::spawn(move || {
        loop{
            if let Ok(res) = r.try_recv() {
                let elapsed = res.time.elapsed().ok().unwrap();
                count2.fetch_add(1, Ordering::Relaxed);
                total2.lock().unwrap().add_assign(elapsed);
            }
        }
    });
    thread::sleep(Duration::from_secs(1));

    let max_loop = 100000;
    for _ in 0..max_loop {
        s.send(Event {
            _type: "hello".to_string(),
            time: SystemTime::now(),
        });
        thread::sleep(Duration::from_micros(1));
    }
    thread::sleep(Duration::from_secs(2));


    let total2 = total.lock().unwrap();
    println!(
        "total: {:?}, average: {:?}, {:?}/s",
        total2,
        total2.div(count.load(Ordering::Relaxed) as u32),
        count.load(Ordering::Relaxed) as f64 / total2.as_secs_f64()
    );
}
zhuxiujia 2021-09-27 14:43

tokio不代表rust弱,csp模型抢占比较积极,计算能力稍高。要公平的话, 这里rust换线程模型试验一下。

go: total: 190360931 average: 1.903µs 100000 rust: total: 19.34ms, average: 193ns, 5170630.816959669/s

结论,rust表现和go基本一致(193纳秒约等于 0.193us)

代码:

use std::sync::atomic::{AtomicI32, Ordering, AtomicU64};
use std::time::{SystemTime, Duration};
use std::sync::{Arc, Mutex};
use std::thread;
use std::ops::{Div, AddAssign};

#[derive(Debug)]
struct Event {
   pub _type: String,
   pub time: SystemTime,
}

fn main() {
    let count=Arc::new(AtomicU64::new(0));
    let total: Arc<Mutex<Duration>> = Arc::new(Mutex::new(Duration::new(0, 0)));
    let (s,r)= crossbeam_channel::bounded::<Event>(200);
    let count2 = count.clone();
    let total2 = total.clone();
    thread::spawn(move || {
        loop{
            if let Ok(res) = r.try_recv() {
                let elapsed = res.time.elapsed().ok().unwrap();
                count2.fetch_add(1, Ordering::Relaxed);
                total2.lock().unwrap().add_assign(elapsed);
            }
        }
    });
    thread::sleep(Duration::from_secs(1));

    let max_loop = 100000;
    for _ in 0..max_loop {
        s.send(Event {
            _type: "hello".to_string(),
            time: SystemTime::now(),
        });
        thread::sleep(Duration::from_micros(1));
    }
    thread::sleep(Duration::from_secs(2));


    let total2 = total.lock().unwrap();
    println!(
        "total: {:?}, average: {:?}, {:?}/s",
        total2,
        total2.div(count.load(Ordering::Relaxed) as u32),
        count.load(Ordering::Relaxed) as f64 / total2.as_secs_f64()
    );
}
作者 kcrazy 2021-09-23 22:01

多谢指点

--
👇
songzhi: spawn时复制并移动Arc可以写成这样,不用额外起名。

    {
        let count = count.clone();
        let total = total.clone();
        rt.spawn(async move {
            while let Some(res) = rx.recv().await {
                let elapsed = res.time.elapsed().ok().unwrap();
                count.fetch_add(1, Ordering::Relaxed);
                total.fetch_add(elapsed.as_micros() as u64, Ordering::Relaxed);
            }
        });
    }
作者 kcrazy 2021-09-23 22:00

go代码也贴上来了

--
👇
johnmave126: 建议把go代码也贴上来不然这也不好对比

作者 kcrazy 2021-09-23 22:00

有道理

--
👇
jht5945: go 默认chan的buffer大小是0,unbounded channel是无限大,这样比较就不公平了

作者 kcrazy 2021-09-22 09:25

好的,我整理一下

--
👇
johnmave126: 建议把go代码也贴上来不然这也不好对比

作者 kcrazy 2021-09-22 09:24

是 Release,我的CPU 是 i9-9900k

--
👇
songzhi: 我的运行结果: Debug:

100000
total: 2.582979787s, average: 25.829µs, 38714.975821063206/s

Release:

100000
total: 1.624922052s, average: 16.249µs, 61541.413556987034/s

你是Release模式下运行的吗?

作者 kcrazy 2021-09-22 09:23

不好意思,写错了,是微秒

--
👇
lineCode: µs不是微秒吗

lineCode 2021-09-18 14:22

µs不是微秒吗

hatter 2021-09-18 13:45

go 默认chan的buffer大小是0,unbounded channel是无限大,这样比较就不公平了

johnmave126 2021-09-18 00:43

建议把go代码也贴上来不然这也不好对比

songzhi 2021-09-17 16:08

spawn时复制并移动Arc可以写成这样,不用额外起名。

    {
        let count = count.clone();
        let total = total.clone();
        rt.spawn(async move {
            while let Some(res) = rx.recv().await {
                let elapsed = res.time.elapsed().ok().unwrap();
                count.fetch_add(1, Ordering::Relaxed);
                total.fetch_add(elapsed.as_micros() as u64, Ordering::Relaxed);
            }
        });
    }
songzhi 2021-09-17 16:05

total用来计时完全没必要用Mutex<Duration>, 用一个AtomicU64就足够了。_type如果是只读的,可以用Arc<String>代替,避免次次堆分配。

songzhi 2021-09-17 15:49

我的运行结果: Debug:

100000
total: 2.582979787s, average: 25.829µs, 38714.975821063206/s

Release:

100000
total: 1.624922052s, average: 16.249µs, 61541.413556987034/s

你是Release模式下运行的吗?

1 共 14 条评论, 1 页