简化后的实例代码如下:
trait AtomExecutable {
fn atom_execute(&self, rest_task_num: Arc<AtomicU64>, rest_core_num: Arc<AtomicU64>);
}
impl AtomExecutable for Task {
fn atom_execute(&self, rest_task_num: Arc<AtomicU64>, rest_core_num: Arc<AtomicU64>) {
let consumed_core_num = self.get_consumed_core_num();
rest_core_num.fetch_sub(consumed_core_num, Ordering::SeqCst);
rest_task_num.fetch_sub(1, Ordering::SeqCst);
// 任务逻辑(密集型计算任务,通常使用4~10核心)
std::thread::sleep(std::time::Duration::from_millis(25000));
rest_core_num.fetch_add(consumed_core_num - 1, Ordering::SeqCst);
// 收尾逻辑
std::thread::sleep(std::time::Duration::from_millis(1000));
// 恢复可用核心数和任务数的计数
rest_core_num.fetch_add(1, Ordering::SeqCst);
rest_task_num.fetch_add(1, Ordering::SeqCst);
}
}
fn runtime_atom() {
let max_task_num = 4;
let max_core_num = 16;
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(max_task_num as usize)
.build()
.expect("Failed to create thread pool.");
let mut rest_task_num = Arc::new(AtomicU64::new(max_task_num));
let mut rest_core_num = Arc::new(AtomicU64::new(max_core_num));
let order = Ordering::SeqCst;
loop {
if rest_task_num.load(order) > 0 && rest_core_num.load(order) > 0 {
println!("rest core num is {}", rest_core_num.load(order));
// Task::new()会返回一个消耗CPU核心数小于等于rest_core_num的任务
// 任务是通过reqwest接收自云服务器上
match Task::new(rest_core_num.load(order)) {
Err(_) => continue,
Ok(task) => {
let rest_task_num_clone = Arc::clone(&rest_task_num);
let rest_core_num_clone = Arc::clone(&rest_core_num);
pool.spawn(move || { task.atom_execute(rest_task_num_clone, rest_core_num_clone) });
}
}
} else {
println!("rest core is not enough");
}
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
基本就是一个多线程接受任务(通常占用多个CPU核心)并计算的后台小程序。以前一直使用Mutex
维护剩余可用任务数与可用CPU核心数的计数,运行得非常正常。前两天突发奇想使用AtomU64
代替,就变成了如上的代码,直接测试这份简单化的代码时一切正常。照着修改完使用的程序并投入测试后,在有5个消耗4核心的任务存在时,它全部接收了(但实际只有16个核心可用),且只有两个正常运行,并且计算完后无法正常结束任务。
这让我确实十分费解,我不知道是我修改的其他的逻辑有问题,还是我对Atomic哪里的理解有问题?求指教。(PS:我知道其实在这里使用Atom其实完全没必要,就是想尝试下,然后好奇bug可能的原因。)
1
共 4 条评论, 1 页
评论区
写评论留了30秒的时间去开启线程并运行,以前用mutex的时候也没这个问题来着的。不过有道理,如果在机器很卡很卡的情况下,貌似真的有可能线程开启不及时。之后试试在主线程里,if后直接fetch_sub,然后在子线程里fetch_add的做法。感谢!
--
👇
rayw0ng: 试想一下 loop 运行了 5 次,5 次 if 都满足条件,但是 fetch_sub 1 次都没有执行。 应该将 fetch_sub 提出来,紧跟 if 执行。
我也觉得有同步问题,但实际测试留的是30秒的时间,以前用Mutex的时候一切正常来着的,下次试试if后直接执行sub吧。
任务应该不会有block,任务逻辑没改,但我实时监测着任务的进度,奇怪的是到了100%也不退出,也不知道是啥情况。
感谢回复!另外,所以这种用Atom管理CPU资源其实是可行的是么?只是写代码的时候出了点问题?
--
👇
Easonzero: 任务全部接收应该是因为发送任务(判断计数器是否为0)和执行任务(计数器减少)之间存在同步问题: 假设所有任务的计数器减少操作被安排在若干个计数器load之后, 则此计数器>0并不能视作剩余计算资源大于0的保证. 另外这也会导致计数器减到负数, 计数器是一个u64, 此时变成了一个极大值, 也不能代表剩余计算资源.
任务无法退出主要看任务本身的逻辑是否可能block, 目前这段code应该不会有这个问题
任务没有全部执行, 一般是因为无法退出的任务block了worker, 导致线程池的worker不够用
任务全部接收应该是因为发送任务(判断计数器是否为0)和执行任务(计数器减少)之间存在同步问题: 假设所有任务的计数器减少操作被安排在若干个计数器load之后, 则此计数器>0并不能视作剩余计算资源大于0的保证. 另外这也会导致计数器减到负数, 计数器是一个u64, 此时变成了一个极大值, 也不能代表剩余计算资源.
任务无法退出主要看任务本身的逻辑是否可能block, 目前这段code应该不会有这个问题
任务没有全部执行, 一般是因为无法退出的任务block了worker, 导致线程池的worker不够用
试想一下 loop 运行了 5 次,5 次 if 都满足条件,但是 fetch_sub 1 次都没有执行。 应该将 fetch_sub 提出来,紧跟 if 执行。