< 返回版块

eweca-d 发表于 2022-07-05 23:18

简化后的实例代码如下:

trait AtomExecutable {
    fn atom_execute(&self, rest_task_num: Arc<AtomicU64>, rest_core_num: Arc<AtomicU64>);
}

impl AtomExecutable for Task {
    fn atom_execute(&self, rest_task_num: Arc<AtomicU64>, rest_core_num: Arc<AtomicU64>) {
        let consumed_core_num = self.get_consumed_core_num();
        rest_core_num.fetch_sub(consumed_core_num, Ordering::SeqCst);
        rest_task_num.fetch_sub(1, Ordering::SeqCst);
        
        // 任务逻辑(密集型计算任务,通常使用4~10核心)
        std::thread::sleep(std::time::Duration::from_millis(25000));

        rest_core_num.fetch_add(consumed_core_num - 1, Ordering::SeqCst);

        // 收尾逻辑
        std::thread::sleep(std::time::Duration::from_millis(1000));

        // 恢复可用核心数和任务数的计数
        rest_core_num.fetch_add(1, Ordering::SeqCst);
        rest_task_num.fetch_add(1, Ordering::SeqCst);
    }
}

fn runtime_atom() {
    let max_task_num = 4;
    let max_core_num = 16;
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(max_task_num as usize)
        .build()
        .expect("Failed to create thread pool.");

    let mut rest_task_num = Arc::new(AtomicU64::new(max_task_num));
    let mut rest_core_num = Arc::new(AtomicU64::new(max_core_num));
    let order = Ordering::SeqCst;

    loop {
        if rest_task_num.load(order) > 0 && rest_core_num.load(order) > 0 {
            println!("rest core num is {}", rest_core_num.load(order));
            
            // Task::new()会返回一个消耗CPU核心数小于等于rest_core_num的任务
            // 任务是通过reqwest接收自云服务器上 
            match Task::new(rest_core_num.load(order)) {
                Err(_) => continue,
                Ok(task) => {
                    let rest_task_num_clone = Arc::clone(&rest_task_num);
                    let rest_core_num_clone = Arc::clone(&rest_core_num);
                    pool.spawn(move || { task.atom_execute(rest_task_num_clone, rest_core_num_clone) });
                }
            }
        } else {
            println!("rest core is not enough");
        }

        std::thread::sleep(std::time::Duration::from_secs(5));
    }
}

基本就是一个多线程接受任务(通常占用多个CPU核心)并计算的后台小程序。以前一直使用Mutex维护剩余可用任务数与可用CPU核心数的计数,运行得非常正常。前两天突发奇想使用AtomU64代替,就变成了如上的代码,直接测试这份简单化的代码时一切正常。照着修改完使用的程序并投入测试后,在有5个消耗4核心的任务存在时,它全部接收了(但实际只有16个核心可用),且只有两个正常运行,并且计算完后无法正常结束任务。

这让我确实十分费解,我不知道是我修改的其他的逻辑有问题,还是我对Atomic哪里的理解有问题?求指教。(PS:我知道其实在这里使用Atom其实完全没必要,就是想尝试下,然后好奇bug可能的原因。)

评论区

写评论
作者 eweca-d 2022-07-06 11:33

留了30秒的时间去开启线程并运行,以前用mutex的时候也没这个问题来着的。不过有道理,如果在机器很卡很卡的情况下,貌似真的有可能线程开启不及时。之后试试在主线程里,if后直接fetch_sub,然后在子线程里fetch_add的做法。感谢!

--
👇
rayw0ng: 试想一下 loop 运行了 5 次,5 次 if 都满足条件,但是 fetch_sub 1 次都没有执行。 应该将 fetch_sub 提出来,紧跟 if 执行。

作者 eweca-d 2022-07-06 11:31

我也觉得有同步问题,但实际测试留的是30秒的时间,以前用Mutex的时候一切正常来着的,下次试试if后直接执行sub吧。

任务应该不会有block,任务逻辑没改,但我实时监测着任务的进度,奇怪的是到了100%也不退出,也不知道是啥情况。

感谢回复!另外,所以这种用Atom管理CPU资源其实是可行的是么?只是写代码的时候出了点问题?

--
👇
Easonzero: 任务全部接收应该是因为发送任务(判断计数器是否为0)和执行任务(计数器减少)之间存在同步问题: 假设所有任务的计数器减少操作被安排在若干个计数器load之后, 则此计数器>0并不能视作剩余计算资源大于0的保证. 另外这也会导致计数器减到负数, 计数器是一个u64, 此时变成了一个极大值, 也不能代表剩余计算资源.

任务无法退出主要看任务本身的逻辑是否可能block, 目前这段code应该不会有这个问题

任务没有全部执行, 一般是因为无法退出的任务block了worker, 导致线程池的worker不够用

Easonzero 2022-07-06 07:34

任务全部接收应该是因为发送任务(判断计数器是否为0)和执行任务(计数器减少)之间存在同步问题: 假设所有任务的计数器减少操作被安排在若干个计数器load之后, 则此计数器>0并不能视作剩余计算资源大于0的保证. 另外这也会导致计数器减到负数, 计数器是一个u64, 此时变成了一个极大值, 也不能代表剩余计算资源.

任务无法退出主要看任务本身的逻辑是否可能block, 目前这段code应该不会有这个问题

任务没有全部执行, 一般是因为无法退出的任务block了worker, 导致线程池的worker不够用

rayw0ng 2022-07-06 07:24

试想一下 loop 运行了 5 次,5 次 if 都满足条件,但是 fetch_sub 1 次都没有执行。 应该将 fetch_sub 提出来,紧跟 if 执行。

1 共 4 条评论, 1 页