< 返回版块

0xe994a4 发表于 2023-02-28 09:55

在开发中用到了 AtomicU8,但只会用 Ordering::SeqCst,不知道原理,更不知道和 Ordering::Relaxed 等其他排序方式的区别。

于是,跟着官方文档读了 nomicon,了解了编译重排和硬件重排会让代码执行出现乱序的情况。

读完又读了 <Why Memory Barriers?>,知道硬件重排是指为了不阻塞而设计了 store buffer 和 invalidate queue,如果不加以控制就会出现指令执行和内存读取的乱序情况。

最后,试着把这些联系起来理解,但是没明白。

所以请教大家:怎么联系和理解 Ordering 与内存屏障的关系?能不能举个例子?


Ext Link: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html

评论区

写评论
relufi 2023-03-05 13:32

Playground可以输出环境变量,可以看出当前运行环境的架构

--
👇
relufi: Playground 服务端是X86的CPU,排除编译期重排序的影响只会对StoreLoad进行重排序

--
👇
lithbitren: playground有可能是不同机子的问题,数据量小的时候我猜测有可能在多个线程启动的间隙就完成了循环,所以导致重排效应不明显。

抛开loom不谈,之前你那个不用原子操作的例子,还是搞不懂为啥会出BUG,而且出现的概率还这么低。

自写自旋锁的机会太少了,其他场景用标准库的原子操作确实还是很难出现可复现的重排例子。

--
👇
night-cruise: 同样的代码,每个线程做10万次处理,同样使用 Rust Playground Release Nightly 测试:

relufi 2023-03-05 13:30

Playground 服务端是X86的CPU,排除编译期重排序的影响只会对StoreLoad进行重排序

--
👇
lithbitren: playground有可能是不同机子的问题,数据量小的时候我猜测有可能在多个线程启动的间隙就完成了循环,所以导致重排效应不明显。

抛开loom不谈,之前你那个不用原子操作的例子,还是搞不懂为啥会出BUG,而且出现的概率还这么低。

自写自旋锁的机会太少了,其他场景用标准库的原子操作确实还是很难出现可复现的重排例子。

--
👇
night-cruise: 同样的代码,每个线程做10万次处理,同样使用 Rust Playground Release Nightly 测试:

night-cruise 2023-03-05 01:59

跟数据量大小/BUG无关,重排序发生的概率本来就很小,同样的对于之前的那个代码,用 AtomicUsize 重写一遍,不使用 loom 在我的本地电脑上运行:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

static M: AtomicUsize = AtomicUsize::new(0);
static N: AtomicUsize = AtomicUsize::new(0);

fn main() {
    let counts = 2000000;
    let mut happend = 0;

    for i in 0..counts {
        println!("{i}th test!");

        let t1 = thread::spawn(|| {
            M.store(42, Ordering::Relaxed);
            N.load(Ordering::Relaxed)
        });
        let t2 = thread::spawn(|| {
            N.store(42, Ordering::Relaxed);
            M.load(Ordering::Relaxed)
        });

        if !(t1.join().unwrap() == 42 || t2.join().unwrap() == 42) {
            happend += 1;
        }

        M.store(0, Ordering::Relaxed);
        N.store(0, Ordering::Relaxed);
    }

    println!("load hoisting happens {happend} times in {counts} tests.");
}

运行结果:

1999996th test!
1999997th test!
1999998th test!
1999999th test!
load hoisting happens 26 times in 2000000 tests.

虽然得到了这个结果,但这是跑了两次得到的,第一次运行的时候一次重排序都没发生。

再跑一次非 Atomic 版本的代码,同样是非 loom 本地运行:

use std::thread;

static mut M: usize = 0;
static mut N: usize = 0;

fn main() {
    let counts = 2000000;
    let mut happend = 0;

    for i in 0..counts {
        println!("{i}th test!");

        let t1 = thread::spawn(|| {
            unsafe { M = 42; N }
        });
        let t2 = thread::spawn(|| {
            unsafe { N = 42; M }
        });

        if !(t1.join().unwrap() == 42 || t2.join().unwrap() == 42) {
            happend += 1;
        }

        unsafe { M = 0; N = 0; }
    }

    println!("load hoisting happens {happend} times in {counts} tests.");
}

运行结果:

1999996th test!
1999997th test!
1999998th test!
1999999th test!
load hoisting happens 31 times in 2000000 tests.

当然上面的代码跟之前我写的那个有点不一样,再跑一下原始版本的:

use std::thread;

static mut X: usize = 0;
static mut Y: usize = 0;
static mut R1: usize = 0;
static mut R2: usize = 0;

fn main() {
    for i in 0..2000000 {
        println!("{i}th test!");
    
        let t1= thread::spawn(|| {
            unsafe { X = 42; R1 = Y; }
        });
        let t2= thread::spawn(|| {
            unsafe { Y = 42; R2 = X; }
        });
        
        t1.join().unwrap();
        t2.join().unwrap();

        unsafe { assert!(R1 == 42 || R2 == 42); }

        unsafe { X = 0; Y = 0; R1 = 0; R2 = 0; }
    }
}

一样可以测出重排序:

306535th test!
306536th test!
306537th test!
306538th test!
306539th test!
306540th test!
306541th test!
306542th test!
thread 'main' panicked at 'assertion failed: R1 == 42 || R2 == 42', src\main.rs:22:18
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

X86是强有序平台只支持 store-load reordering,下面的代码就算跑四百万次也不会发生 reordering:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

static X: AtomicUsize = AtomicUsize::new(0);
static FLAG: AtomicUsize = AtomicUsize::new(0);

fn main() {
    for i in 0..4000000 {
        println!("{i}th test!");

        let t1 = thread::spawn(|| {
            X.store(1, Ordering::Relaxed);
            FLAG.store(1, Ordering::Relaxed);
        });
        let t2 = thread::spawn(|| {
            if FLAG.load(Ordering::Relaxed) == 1 {
                assert_eq!(X.load(Ordering::Relaxed), 1);
            }
        });

        t1.join().unwrap();
        t2.join().unwrap();

        X.store(0, Ordering::Relaxed);
        FLAG.store(0, Ordering::Relaxed);
    }
}

总之,CPU乱序/编译器重排序发生的概率很小,有时候100万次运行可以都不会出现类似的行为。

👇
lithbitren: playground有可能是不同机子的问题,数据量小的时候我猜测有可能在多个线程启动的间隙就完成了循环,所以导致重排效应不明显。

抛开loom不谈,之前你那个不用原子操作的例子,还是搞不懂为啥会出BUG,而且出现的概率还这么低。

自写自旋锁的机会太少了,其他场景用标准库的原子操作确实还是很难出现可复现的重排例子。

--
👇
night-cruise: 同样的代码,每个线程做10万次处理,同样使用 Rust Playground Release Nightly 测试:

lithbitren 2023-03-04 22:48

playground有可能是不同机子的问题,数据量小的时候我猜测有可能在多个线程启动的间隙就完成了循环,所以导致重排效应不明显。

抛开loom不谈,之前你那个不用原子操作的例子,还是搞不懂为啥会出BUG,而且出现的概率还这么低。

自写自旋锁的机会太少了,其他场景用标准库的原子操作确实还是很难出现可复现的重排例子。

--
👇
night-cruise: 同样的代码,每个线程做10万次处理,同样使用 Rust Playground Release Nightly 测试:

night-cruise 2023-03-04 11:46

同样的代码,每个线程做10万次处理,同样使用 Rust Playground Release Nightly 测试:

// 10个线程,每个线程100000次处理
const N_THREADS: usize = 10;
const N_TIMES: usize = 100000;

测试结果:

  1: R = 19999900000, r = 499999500000, spin: 14.847829ms, mutex: 22.069302ms
  2: R = 209999800000, r = 499999500000, spin: 8.809658ms, mutex: 22.105264ms
  3: R = 184999750000, r = 499999500000, spin: 8.761397ms, mutex: 22.104413ms
  4: R = 154999750000, r = 499999500000, spin: 8.778917ms, mutex: 20.811857ms
  5: R = 154999750000, r = 499999500000, spin: 8.772477ms, mutex: 20.802356ms
  6: R = 179999800000, r = 499999500000, spin: 20.459996ms, mutex: 22.466534ms
  7: R = 389999700000, r = 499999500000, spin: 8.295765ms, mutex: 21.627509ms
  8: R = 174999850000, r = 499999500000, spin: 20.822977ms, mutex: 22.666709ms
  9: R = 359999700000, r = 499999500000, spin: 15.682322ms, mutex: 22.452583ms
 10: R = 64999950000, r = 499999500000, spin: 27.899227ms, mutex: 21.444614ms

10次运行,每次都能观测到重排序行为。

再在我的本地电脑上运行测试(Win10 + i5),首先是每个线程做 1000 次处理:

  1: R = 49995000, r = 49995000, spin: 1.3565ms, mutex: 883.5µs
  2: R = 49995000, r = 49995000, spin: 581.7µs, mutex: 567.8µs
  3: R = 49995000, r = 49995000, spin: 535.2µs, mutex: 629.5µs
  4: R = 16497500, r = 49995000, spin: 835.7µs, mutex: 589.6µs
  5: R = 49995000, r = 49995000, spin: 642.5µs, mutex: 649.5µs
  6: R = 49995000, r = 49995000, spin: 638.9µs, mutex: 615.5µs
  7: R = 49995000, r = 49995000, spin: 616.9µs, mutex: 721.2µs
  8: R = 49995000, r = 49995000, spin: 725.1µs, mutex: 580.5µs
  9: R = 49995000, r = 49995000, spin: 993.5µs, mutex: 748.5µs
 10: R = 49995000, r = 49995000, spin: 643.1µs, mutex: 738.8µs

有一次运行发生了重排序。

每个线程做100000次处理:

  1: R = 44999950000, r = 499999500000, spin: 120.7095ms, mutex: 25.9543ms
  2: R = 204999850000, r = 499999500000, spin: 98.462ms, mutex: 26.138ms
  3: R = 44999950000, r = 499999500000, spin: 107.4303ms, mutex: 23.5057ms
  4: R = 94999950000, r = 499999500000, spin: 150.7484ms, mutex: 25.7018ms
  5: R = 34999950000, r = 499999500000, spin: 93.0333ms, mutex: 24.7112ms
  6: R = 24999950000, r = 499999500000, spin: 101.9827ms, mutex: 22.8735ms
  7: R = 74999950000, r = 499999500000, spin: 140.9424ms, mutex: 22.6061ms
  8: R = 94999950000, r = 499999500000, spin: 139.8281ms, mutex: 26.2343ms
  9: R = 54999950000, r = 499999500000, spin: 119.9944ms, mutex: 24.0116ms
 10: R = 64999950000, r = 499999500000, spin: 107.1534ms, mutex: 25.4194ms

每一次运行都发生了重排序。

--
👇
lithbitren: 我那个自旋锁实验只在我这win10+i9的环境可以复现,playground无论什么数据规模什么编译选项都没有复现出非预期结果。

--
👇
night-cruise: 1. 我的电脑上也跑不出来了,不过应该跟 unsafe 没关系,在 loom 下 使用 AtomicUsize 进行测试:

    #[test]
    #[should_panic]
    fn load_hoisting_concurrent() {
        loom::model(|| {
            let m = Arc::new(AtomicUsize::new(0));
            let n = Arc::new(AtomicUsize::new(0));

            let (m1, n1) = (m.clone(), n.clone());
            let t1 = thread::spawn(move || {
                n1.store(42, Relaxed);
                m1.load(Relaxed)
            });
            let t2 = thread::spawn(move || {
                m.store(42, Relaxed);
                n.load(Relaxed)
            });
        
            assert!(t1.join().unwrap() == 42 || t2.join().unwrap() == 42);
        });
    }

测试结果:

RUSTFLAGS="--cfg loom" cargo test --release

test tests::buggy_concurrent - should panic ... ok

说明 m 和 n 的值可以同时读到 0。

  1. 你的每个线程做10万次处理,然后有10个线程,那么就一共做了100万次处理,100万次已经算很多次了,发生重排序很正常(除了编译器重排序外,CPU也有可能乱序执行,编译器和CPU不会每次都重排序),如果改成每个线程做1000次处理,那么就基本测不出来重排序行为了。
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
 
// 10个线程,每个线程1000次处理
const N_THREADS: usize = 10;
const N_TIMES: usize = 1000;

// R是待修改变量,SPIN_LOCK则是标记锁
static mut R: usize = 0;
static SPIN_LOCK: AtomicBool = AtomicBool::new(false);
 
 
fn main() {

    for t in 1..=10 {

        unsafe {
            R = 0;
        }
        
        let start_of_spin = Instant::now();

        let handles = (0..N_THREADS).map(|i| {
            thread::spawn(move || {
                unsafe {
                    for j in i * N_TIMES.. (i + 1) * N_TIMES {
                        // 用while循环来阻塞线程,swap来保证判断和修改的原子性,此处用了最宽松的Relaxed
                        while SPIN_LOCK.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed).is_err() { }
                        // 修改数据,本身并不是线程安全的
                        R += j;
                        // 把锁改回false让所有线程继续抢锁
                        SPIN_LOCK.store(false, Ordering::Relaxed);
                    }
                }
            })
        }).collect::<Vec<_>>();
 
        for handle in handles {
            handle.join().unwrap();
        }

        let time_of_spin = start_of_spin.elapsed();

        let r = Arc::new(Mutex::new(0));
        
        let start_of_mutex = Instant::now();
        
        // 标准的多线程修改数据方法
        let handles = (0..N_THREADS).map(|i| {
            let r = r.clone();
            thread::spawn(move || {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    *r.lock().unwrap() += j;
                }
            })
        }).collect::<Vec<_>>();
 
        for handle in handles {
            handle.join().unwrap();
        }

        let time_of_mutex = start_of_mutex.elapsed();
 
        println!("{t:3}: R = {}, r = {}, spin: {time_of_spin:?}, mutex: {time_of_mutex:?}", unsafe { R }, r.lock().unwrap());
    }
}

运行结果(使用 Rust Playground nightly Release):

  1: R = 49995000, r = 49995000, spin: 699.24µs, mutex: 392.451µs
  2: R = 49995000, r = 49995000, spin: 430.662µs, mutex: 379.691µs
  3: R = 49995000, r = 49995000, spin: 379.881µs, mutex: 369.571µs
  4: R = 49995000, r = 49995000, spin: 433.492µs, mutex: 4.095507ms
  5: R = 49995000, r = 49995000, spin: 500.344µs, mutex: 611.958µs
  6: R = 49995000, r = 49995000, spin: 563.016µs, mutex: 592.217µs
  7: R = 49995000, r = 49995000, spin: 612.457µs, mutex: 753.931µs
  8: R = 49995000, r = 49995000, spin: 1.418711ms, mutex: 498.594µs
  9: R = 49995000, r = 49995000, spin: 493.264µs, mutex: 1.06071ms
 10: R = 49995000, r = 49995000, spin: 528.355µs, mutex: 3.914241ms
  1. buggy_concurrent 测试是 should panic 的,把 store relaxed 换成 store release 照样成功了呀。。。
    #[test]
    #[should_panic]
    fn buggy_concurrent() {
        loom::model(|| {
            let x = Arc::new(AtomicUsize::new(0));
            let x_clone = Arc::clone(&x);
            let flag = Arc::new(AtomicUsize::new(0));
            let flag_clone = Arc::clone(&flag);

            let t1 = thread::spawn(move || {
                x.store(1, Relaxed);
                flag.store(1, Release);
            });
            let t2 = thread::spawn(move || {
                if flag_clone.load(Relaxed) == 1 {
                    assert_eq!(x_clone.load(Relaxed), 1);
                }
            });
            
            t1.join().unwrap();
            t2.join().unwrap();
        });
RUSTFLAGS="--cfg loom" cargo test --release 

test tests::buggy_concurrent - should panic ... ok

--
👇
lithbitren: 👇 night-cruise: X86 上 store-load reordring 的例子:


不用任何原子操作,我这边运行一千万次也没出现问题。

不过我觉得你这个(1/2000000)的bug不一定是编译重排导致的,搞不好是unsafe下的未定义行为,比跨线程数据没有即时刷新之类的。

我个人理解的编译重排在编译期就确定了,重复运行不会再次改变执行顺序,比如:

X = 1;
R1 = Y;

由于缺乏上下文的关联性,经过编译优化后执行顺序可能会变成:

R1 = Y;
X = 1;

或者是:

X = 1;
R1 = Y;
X = 0;

可能会优化成:

X = 0;
R1 = Y;

如果排成这样的执行顺序,几乎次次执行都会出错,应该很难出现执行了百万次甚至数千万次才有个位数的出错次数。

可以看看之前我写过的自旋锁的第一个例子:

https://rustcc.cn/article?id=3259fdf2-9caa-4bf6-a835-6d58efe2f9ee

里面直接用Relaxed可以说次次出错,我更倾向于这种才是编译重排导致的非预期结果。

至于loom这个例子,把fn buggy_concurrent()里的t1(thread1)下

flag.store(1, Relaxed);改成flag.store(1, Release);

其他参数不变试试看,看看这里Acquire是不是必须的。

lithbitren 2023-03-04 03:13

我那个自旋锁实验只在我这win10+i9的环境可以复现,playground无论什么数据规模什么编译选项都没有复现出非预期结果。

--
👇
night-cruise: 1. 我的电脑上也跑不出来了,不过应该跟 unsafe 没关系,在 loom 下 使用 AtomicUsize 进行测试:

    #[test]
    #[should_panic]
    fn load_hoisting_concurrent() {
        loom::model(|| {
            let m = Arc::new(AtomicUsize::new(0));
            let n = Arc::new(AtomicUsize::new(0));

            let (m1, n1) = (m.clone(), n.clone());
            let t1 = thread::spawn(move || {
                n1.store(42, Relaxed);
                m1.load(Relaxed)
            });
            let t2 = thread::spawn(move || {
                m.store(42, Relaxed);
                n.load(Relaxed)
            });
        
            assert!(t1.join().unwrap() == 42 || t2.join().unwrap() == 42);
        });
    }

测试结果:

RUSTFLAGS="--cfg loom" cargo test --release

test tests::buggy_concurrent - should panic ... ok

说明 m 和 n 的值可以同时读到 0。

  1. 你的每个线程做10万次处理,然后有10个线程,那么就一共做了100万次处理,100万次已经算很多次了,发生重排序很正常(除了编译器重排序外,CPU也有可能乱序执行,编译器和CPU不会每次都重排序),如果改成每个线程做1000次处理,那么就基本测不出来重排序行为了。
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
 
// 10个线程,每个线程1000次处理
const N_THREADS: usize = 10;
const N_TIMES: usize = 1000;

// R是待修改变量,SPIN_LOCK则是标记锁
static mut R: usize = 0;
static SPIN_LOCK: AtomicBool = AtomicBool::new(false);
 
 
fn main() {

    for t in 1..=10 {

        unsafe {
            R = 0;
        }
        
        let start_of_spin = Instant::now();

        let handles = (0..N_THREADS).map(|i| {
            thread::spawn(move || {
                unsafe {
                    for j in i * N_TIMES.. (i + 1) * N_TIMES {
                        // 用while循环来阻塞线程,swap来保证判断和修改的原子性,此处用了最宽松的Relaxed
                        while SPIN_LOCK.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed).is_err() { }
                        // 修改数据,本身并不是线程安全的
                        R += j;
                        // 把锁改回false让所有线程继续抢锁
                        SPIN_LOCK.store(false, Ordering::Relaxed);
                    }
                }
            })
        }).collect::<Vec<_>>();
 
        for handle in handles {
            handle.join().unwrap();
        }

        let time_of_spin = start_of_spin.elapsed();

        let r = Arc::new(Mutex::new(0));
        
        let start_of_mutex = Instant::now();
        
        // 标准的多线程修改数据方法
        let handles = (0..N_THREADS).map(|i| {
            let r = r.clone();
            thread::spawn(move || {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    *r.lock().unwrap() += j;
                }
            })
        }).collect::<Vec<_>>();
 
        for handle in handles {
            handle.join().unwrap();
        }

        let time_of_mutex = start_of_mutex.elapsed();
 
        println!("{t:3}: R = {}, r = {}, spin: {time_of_spin:?}, mutex: {time_of_mutex:?}", unsafe { R }, r.lock().unwrap());
    }
}

运行结果(使用 Rust Playground nightly Release):

  1: R = 49995000, r = 49995000, spin: 699.24µs, mutex: 392.451µs
  2: R = 49995000, r = 49995000, spin: 430.662µs, mutex: 379.691µs
  3: R = 49995000, r = 49995000, spin: 379.881µs, mutex: 369.571µs
  4: R = 49995000, r = 49995000, spin: 433.492µs, mutex: 4.095507ms
  5: R = 49995000, r = 49995000, spin: 500.344µs, mutex: 611.958µs
  6: R = 49995000, r = 49995000, spin: 563.016µs, mutex: 592.217µs
  7: R = 49995000, r = 49995000, spin: 612.457µs, mutex: 753.931µs
  8: R = 49995000, r = 49995000, spin: 1.418711ms, mutex: 498.594µs
  9: R = 49995000, r = 49995000, spin: 493.264µs, mutex: 1.06071ms
 10: R = 49995000, r = 49995000, spin: 528.355µs, mutex: 3.914241ms
  1. buggy_concurrent 测试是 should panic 的,把 store relaxed 换成 store release 照样成功了呀。。。
    #[test]
    #[should_panic]
    fn buggy_concurrent() {
        loom::model(|| {
            let x = Arc::new(AtomicUsize::new(0));
            let x_clone = Arc::clone(&x);
            let flag = Arc::new(AtomicUsize::new(0));
            let flag_clone = Arc::clone(&flag);

            let t1 = thread::spawn(move || {
                x.store(1, Relaxed);
                flag.store(1, Release);
            });
            let t2 = thread::spawn(move || {
                if flag_clone.load(Relaxed) == 1 {
                    assert_eq!(x_clone.load(Relaxed), 1);
                }
            });
            
            t1.join().unwrap();
            t2.join().unwrap();
        });
RUSTFLAGS="--cfg loom" cargo test --release 

test tests::buggy_concurrent - should panic ... ok

--
👇
lithbitren: 👇 night-cruise: X86 上 store-load reordring 的例子:


不用任何原子操作,我这边运行一千万次也没出现问题。

不过我觉得你这个(1/2000000)的bug不一定是编译重排导致的,搞不好是unsafe下的未定义行为,比跨线程数据没有即时刷新之类的。

我个人理解的编译重排在编译期就确定了,重复运行不会再次改变执行顺序,比如:

X = 1;
R1 = Y;

由于缺乏上下文的关联性,经过编译优化后执行顺序可能会变成:

R1 = Y;
X = 1;

或者是:

X = 1;
R1 = Y;
X = 0;

可能会优化成:

X = 0;
R1 = Y;

如果排成这样的执行顺序,几乎次次执行都会出错,应该很难出现执行了百万次甚至数千万次才有个位数的出错次数。

可以看看之前我写过的自旋锁的第一个例子:

https://rustcc.cn/article?id=3259fdf2-9caa-4bf6-a835-6d58efe2f9ee

里面直接用Relaxed可以说次次出错,我更倾向于这种才是编译重排导致的非预期结果。

至于loom这个例子,把fn buggy_concurrent()里的t1(thread1)下

flag.store(1, Relaxed);改成flag.store(1, Release);

其他参数不变试试看,看看这里Acquire是不是必须的。

night-cruise 2023-03-04 02:50
  1. 我的电脑上也跑不出来了,不过应该跟 unsafe 没关系,在 loom 下 使用 AtomicUsize 进行测试:
    #[test]
    #[should_panic]
    fn load_hoisting_concurrent() {
        loom::model(|| {
            let m = Arc::new(AtomicUsize::new(0));
            let n = Arc::new(AtomicUsize::new(0));

            let (m1, n1) = (m.clone(), n.clone());
            let t1 = thread::spawn(move || {
                n1.store(42, Relaxed);
                m1.load(Relaxed)
            });
            let t2 = thread::spawn(move || {
                m.store(42, Relaxed);
                n.load(Relaxed)
            });
        
            assert!(t1.join().unwrap() == 42 || t2.join().unwrap() == 42);
        });
    }

测试结果:

RUSTFLAGS="--cfg loom" cargo test --release

test tests::buggy_concurrent - should panic ... ok

说明 m 和 n 的值可以同时读到 0。

  1. 你的每个线程做10万次处理,然后有10个线程,那么就一共做了100万次处理,100万次已经算很多次了,发生重排序很正常(除了编译器重排序外,CPU也有可能乱序执行,编译器和CPU不会每次都重排序),如果改成每个线程做1000次处理,那么就基本测不出来重排序行为了。
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
 
// 10个线程,每个线程1000次处理
const N_THREADS: usize = 10;
const N_TIMES: usize = 1000;

// R是待修改变量,SPIN_LOCK则是标记锁
static mut R: usize = 0;
static SPIN_LOCK: AtomicBool = AtomicBool::new(false);
 
 
fn main() {

    for t in 1..=10 {

        unsafe {
            R = 0;
        }
        
        let start_of_spin = Instant::now();

        let handles = (0..N_THREADS).map(|i| {
            thread::spawn(move || {
                unsafe {
                    for j in i * N_TIMES.. (i + 1) * N_TIMES {
                        // 用while循环来阻塞线程,swap来保证判断和修改的原子性,此处用了最宽松的Relaxed
                        while SPIN_LOCK.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed).is_err() { }
                        // 修改数据,本身并不是线程安全的
                        R += j;
                        // 把锁改回false让所有线程继续抢锁
                        SPIN_LOCK.store(false, Ordering::Relaxed);
                    }
                }
            })
        }).collect::<Vec<_>>();
 
        for handle in handles {
            handle.join().unwrap();
        }

        let time_of_spin = start_of_spin.elapsed();

        let r = Arc::new(Mutex::new(0));
        
        let start_of_mutex = Instant::now();
        
        // 标准的多线程修改数据方法
        let handles = (0..N_THREADS).map(|i| {
            let r = r.clone();
            thread::spawn(move || {
                for j in i * N_TIMES.. (i + 1) * N_TIMES {
                    *r.lock().unwrap() += j;
                }
            })
        }).collect::<Vec<_>>();
 
        for handle in handles {
            handle.join().unwrap();
        }

        let time_of_mutex = start_of_mutex.elapsed();
 
        println!("{t:3}: R = {}, r = {}, spin: {time_of_spin:?}, mutex: {time_of_mutex:?}", unsafe { R }, r.lock().unwrap());
    }
}

运行结果(使用 Rust Playground nightly Release):

  1: R = 49995000, r = 49995000, spin: 699.24µs, mutex: 392.451µs
  2: R = 49995000, r = 49995000, spin: 430.662µs, mutex: 379.691µs
  3: R = 49995000, r = 49995000, spin: 379.881µs, mutex: 369.571µs
  4: R = 49995000, r = 49995000, spin: 433.492µs, mutex: 4.095507ms
  5: R = 49995000, r = 49995000, spin: 500.344µs, mutex: 611.958µs
  6: R = 49995000, r = 49995000, spin: 563.016µs, mutex: 592.217µs
  7: R = 49995000, r = 49995000, spin: 612.457µs, mutex: 753.931µs
  8: R = 49995000, r = 49995000, spin: 1.418711ms, mutex: 498.594µs
  9: R = 49995000, r = 49995000, spin: 493.264µs, mutex: 1.06071ms
 10: R = 49995000, r = 49995000, spin: 528.355µs, mutex: 3.914241ms
  1. buggy_concurrent 测试是 should panic 的,把 store relaxed 换成 store release 照样成功了呀。。。
    #[test]
    #[should_panic]
    fn buggy_concurrent() {
        loom::model(|| {
            let x = Arc::new(AtomicUsize::new(0));
            let x_clone = Arc::clone(&x);
            let flag = Arc::new(AtomicUsize::new(0));
            let flag_clone = Arc::clone(&flag);

            let t1 = thread::spawn(move || {
                x.store(1, Relaxed);
                flag.store(1, Release);
            });
            let t2 = thread::spawn(move || {
                if flag_clone.load(Relaxed) == 1 {
                    assert_eq!(x_clone.load(Relaxed), 1);
                }
            });
            
            t1.join().unwrap();
            t2.join().unwrap();
        });
RUSTFLAGS="--cfg loom" cargo test --release 

test tests::buggy_concurrent - should panic ... ok

--
👇
lithbitren: 👇 night-cruise: X86 上 store-load reordring 的例子:


不用任何原子操作,我这边运行一千万次也没出现问题。

不过我觉得你这个(1/2000000)的bug不一定是编译重排导致的,搞不好是unsafe下的未定义行为,比跨线程数据没有即时刷新之类的。

我个人理解的编译重排在编译期就确定了,重复运行不会再次改变执行顺序,比如:

X = 1;
R1 = Y;

由于缺乏上下文的关联性,经过编译优化后执行顺序可能会变成:

R1 = Y;
X = 1;

或者是:

X = 1;
R1 = Y;
X = 0;

可能会优化成:

X = 0;
R1 = Y;

如果排成这样的执行顺序,几乎次次执行都会出错,应该很难出现执行了百万次甚至数千万次才有个位数的出错次数。

可以看看之前我写过的自旋锁的第一个例子:

https://rustcc.cn/article?id=3259fdf2-9caa-4bf6-a835-6d58efe2f9ee

里面直接用Relaxed可以说次次出错,我更倾向于这种才是编译重排导致的非预期结果。

至于loom这个例子,把fn buggy_concurrent()里的t1(thread1)下

flag.store(1, Relaxed);改成flag.store(1, Release);

其他参数不变试试看,看看这里Acquire是不是必须的。

lithbitren 2023-03-03 23:05

👇 night-cruise: X86 上 store-load reordring 的例子:


不用任何原子操作,我这边运行一千万次也没出现问题。

不过我觉得你这个(1/2000000)的bug不一定是编译重排导致的,搞不好是unsafe下的未定义行为,比跨线程数据没有即时刷新之类的。

我个人理解的编译重排在编译期就确定了,重复运行不会再次改变执行顺序,比如:

X = 1;
R1 = Y;

由于缺乏上下文的关联性,经过编译优化后执行顺序可能会变成:

R1 = Y;
X = 1;

或者是:

X = 1;
R1 = Y;
X = 0;

可能会优化成:

X = 0;
R1 = Y;

如果排成这样的执行顺序,几乎次次执行都会出错,应该很难出现执行了百万次甚至数千万次才有个位数的出错次数。

可以看看之前我写过的自旋锁的第一个例子:

https://rustcc.cn/article?id=3259fdf2-9caa-4bf6-a835-6d58efe2f9ee

里面直接用Relaxed可以说次次出错,我更倾向于这种才是编译重排导致的非预期结果。

至于loom这个例子,把fn buggy_concurrent()里的t1(thread1)下

flag.store(1, Relaxed);改成flag.store(1, Release);

其他参数不变试试看,看看这里Acquire是不是必须的。

night-cruise 2023-03-02 21:25

X86 上 store-load reordring 的例子:

use std::thread;

static mut R1: i32 = 0;
static mut R2: i32 = 0;
static mut X: i32 = 0;
static mut Y: i32 = 0;

fn main() {
    let total = 2000000;
    let mut count = 0;

    for _ in 0..total {
        if test_load_hoisting() {
            count += 1;
        }
    }
    println!("load hoisting happens {count} times in {total} running.");
}

fn test_load_hoisting() -> bool {
    let t1= thread::spawn(|| {
        unsafe {
            X = 1;
            R1 = Y;
        }
    });
    let t2= thread::spawn(|| {
        unsafe {
            Y = 1;
            R2 = X;
        }
    });
    
    t1.join().unwrap();
    t2.join().unwrap();

    unsafe {
        let res = R1 == 0 && R2 == 0;
        R1 = 0;
        R2 = 0;
        X = 0;
        Y = 0;

        res
    }
}

测试结果:

cargo run --release
   
load hoisting happens 1 times in 2000000 running.

--
👇
night-cruise: 似乎是因为X86本身是强有序的平台,好像只支持 store-load reordering(记不太清了),下面的代码我跑了300万次也没有发生 reordering:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

static X: AtomicUsize = AtomicUsize::new(0);
static FLAG: AtomicUsize = AtomicUsize::new(0);

const TIMES: usize = 3000000;

fn main() {
    for i in 0..TIMES {
        println!("第 {i} 次测试!");
        
        X.store(0, Ordering::Relaxed);
        FLAG.store(0, Ordering::Relaxed);

        let t1 = thread::spawn(move || {
            X.store(1, Ordering::Relaxed);
            FLAG.store(1, Ordering::Relaxed);
        });

        let t2 = thread::spawn(move || {
            if FLAG.load(Ordering::Relaxed) == 1 {
                assert_eq!(X.load(Ordering::Relaxed), 1);
            }
        });

        t1.join().unwrap();
        t2.join().unwrap();
    }
}

不过可以使用 loom crate(https://github.com/tokio-rs/loom) 进行测试,它会在 C11 内存模型下对执行进行尽可能的排列:

    use loom::sync::Arc;
    use loom::sync::atomic::AtomicUsize;
    use loom::sync::atomic::Ordering::{Acquire, Release, Relaxed};
    use loom::thread;

    #[test]
    #[should_panic]
    fn buggy_concurrent() {
        loom::model(|| {
            let x = Arc::new(AtomicUsize::new(0));
            let x_clone = Arc::clone(&x);
            let flag = Arc::new(AtomicUsize::new(0));
            let flag_clone = Arc::clone(&flag);

            let t1 = thread::spawn(move || {
                x.store(1, Relaxed);
                flag.store(1, Relaxed);
            });
            let t2 = thread::spawn(move || {
                if flag_clone.load(Relaxed) == 1 {
                    assert_eq!(x_clone.load(Relaxed), 1);
                }
            });
            
            t1.join().unwrap();
            t2.join().unwrap();
        });
    }


    #[test]
    fn no_buggy_concurrent() {
        loom::model(|| {
            let x = Arc::new(AtomicUsize::new(0));
            let x_clone = Arc::clone(&x);
            let flag = Arc::new(AtomicUsize::new(0));
            let flag_clone = Arc::clone(&flag);

            let t1 = thread::spawn(move || {
                x.store(1, Relaxed);
                flag.store(1, Release);
            });
            let t2 = thread::spawn(move || {
                if flag_clone.load(Acquire) == 1 {
                    assert_eq!(x_clone.load(Relaxed), 1);
                }
            });
            
            t1.join().unwrap();
            t2.join().unwrap();
        });
    }

这样就能直接测出来了:

running 2 tests
test tests::buggy_concurrent - should panic ... ok
test tests::no_buggy_concurrent ... ok

--
👇
lithbitren:

👇
night-cruise: Release/Acquire 本来就是跨线程配对使用的,用来建立 happens before 关系。SeqCst 的话则更加复杂,我们需要考虑所有的 SeqCst fence 的 interleaving,论证每种情况下是否满足无锁数据结构的正确性,这常常会导致”组合爆炸“。例如:https://github.com/crossbeam-rs/rfcs/blob/master/text/2017-07-23-relaxed-memory.md

--
👇
night-cruise: 我们想要的是断言 assert_eq!(X, 1) 不会失败,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1),那么断言根本不会执行,因为 Flag.load 出来的值是0,不会进入的 if 分支中去。你需要这样改代码:


确实应该是:“如果可以执行断言,那断言必然为真。”

如果修改程序的其实应该用option来改,None表示没发生断言,Some(true)则表示断言为真,反之为假。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: Option<bool> = None;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = None;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = Some(X == 1);
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();

        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

结果应该是类似于:

0: Some(true)
1: None
2: None
3: Some(true)
4: None
5: Some(true)
6: Some(true)
7: Some(true)
8: None
9: None

不过编译重排在这个例子里面仍然很难复现,即使是全用Relaxed也很难复现出Some(false)的结果来,尤其thread2,Ordering用什么枚举值可以说毫无影响。

我唯一一次复现出Relaxed出现编译重排的情况是在自旋锁的简单实现里,确实出现了编译重排导致自旋锁失效,改成Acquire/Release/SeqCst就没问题了,但也是在同一作用域下的问题,如果出现跨作用域或者出现影响线程的语句,也不会出现编译重排,比如thread2这种情况,判断语句和判断后的执行块理论上应该不会出现乱序的情况。

night-cruise 2023-03-02 21:17

似乎是因为X86本身是强有序的平台,好像只支持 store-load reordering(记不太清了),下面的代码我跑了300万次也没有发生 reordering:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

static X: AtomicUsize = AtomicUsize::new(0);
static FLAG: AtomicUsize = AtomicUsize::new(0);

const TIMES: usize = 3000000;

fn main() {
    for i in 0..TIMES {
        println!("第 {i} 次测试!");
        
        X.store(0, Ordering::Relaxed);
        FLAG.store(0, Ordering::Relaxed);

        let t1 = thread::spawn(move || {
            X.store(1, Ordering::Relaxed);
            FLAG.store(1, Ordering::Relaxed);
        });

        let t2 = thread::spawn(move || {
            if FLAG.load(Ordering::Relaxed) == 1 {
                assert_eq!(X.load(Ordering::Relaxed), 1);
            }
        });

        t1.join().unwrap();
        t2.join().unwrap();
    }
}

不过可以使用 loom crate(https://github.com/tokio-rs/loom) 进行测试,它会在 C11 内存模型下对执行进行尽可能的排列:

    use loom::sync::Arc;
    use loom::sync::atomic::AtomicUsize;
    use loom::sync::atomic::Ordering::{Acquire, Release, Relaxed};
    use loom::thread;

    #[test]
    #[should_panic]
    fn buggy_concurrent() {
        loom::model(|| {
            let x = Arc::new(AtomicUsize::new(0));
            let x_clone = Arc::clone(&x);
            let flag = Arc::new(AtomicUsize::new(0));
            let flag_clone = Arc::clone(&flag);

            let t1 = thread::spawn(move || {
                x.store(1, Relaxed);
                flag.store(1, Relaxed);
            });
            let t2 = thread::spawn(move || {
                if flag_clone.load(Relaxed) == 1 {
                    assert_eq!(x_clone.load(Relaxed), 1);
                }
            });
            
            t1.join().unwrap();
            t2.join().unwrap();
        });
    }


    #[test]
    fn no_buggy_concurrent() {
        loom::model(|| {
            let x = Arc::new(AtomicUsize::new(0));
            let x_clone = Arc::clone(&x);
            let flag = Arc::new(AtomicUsize::new(0));
            let flag_clone = Arc::clone(&flag);

            let t1 = thread::spawn(move || {
                x.store(1, Relaxed);
                flag.store(1, Release);
            });
            let t2 = thread::spawn(move || {
                if flag_clone.load(Acquire) == 1 {
                    assert_eq!(x_clone.load(Relaxed), 1);
                }
            });
            
            t1.join().unwrap();
            t2.join().unwrap();
        });
    }

这样就能直接测出来了:

running 2 tests
test tests::buggy_concurrent - should panic ... ok
test tests::no_buggy_concurrent ... ok

--
👇
lithbitren:

👇
night-cruise: Release/Acquire 本来就是跨线程配对使用的,用来建立 happens before 关系。SeqCst 的话则更加复杂,我们需要考虑所有的 SeqCst fence 的 interleaving,论证每种情况下是否满足无锁数据结构的正确性,这常常会导致”组合爆炸“。例如:https://github.com/crossbeam-rs/rfcs/blob/master/text/2017-07-23-relaxed-memory.md

--
👇
night-cruise: 我们想要的是断言 assert_eq!(X, 1) 不会失败,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1),那么断言根本不会执行,因为 Flag.load 出来的值是0,不会进入的 if 分支中去。你需要这样改代码:


确实应该是:“如果可以执行断言,那断言必然为真。”

如果修改程序的其实应该用option来改,None表示没发生断言,Some(true)则表示断言为真,反之为假。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: Option<bool> = None;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = None;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = Some(X == 1);
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();

        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

结果应该是类似于:

0: Some(true)
1: None
2: None
3: Some(true)
4: None
5: Some(true)
6: Some(true)
7: Some(true)
8: None
9: None

不过编译重排在这个例子里面仍然很难复现,即使是全用Relaxed也很难复现出Some(false)的结果来,尤其thread2,Ordering用什么枚举值可以说毫无影响。

我唯一一次复现出Relaxed出现编译重排的情况是在自旋锁的简单实现里,确实出现了编译重排导致自旋锁失效,改成Acquire/Release/SeqCst就没问题了,但也是在同一作用域下的问题,如果出现跨作用域或者出现影响线程的语句,也不会出现编译重排,比如thread2这种情况,判断语句和判断后的执行块理论上应该不会出现乱序的情况。

lithbitren 2023-03-02 00:21

--
👇
night-cruise: Release/Acquire 本来就是跨线程配对使用的,用来建立 happens before 关系。SeqCst 的话则更加复杂,我们需要考虑所有的 SeqCst fence 的 interleaving,论证每种情况下是否满足无锁数据结构的正确性,这常常会导致”组合爆炸“。例如:https://github.com/crossbeam-rs/rfcs/blob/master/text/2017-07-23-relaxed-memory.md

--
👇
night-cruise: 我们想要的是断言 assert_eq!(X, 1) 不会失败,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1),那么断言根本不会执行,因为 Flag.load 出来的值是0,不会进入的 if 分支中去。你需要这样改代码:


确实应该是:“如果可以执行断言,那断言必然为真。”

如果修改程序的其实应该用option来改,None表示没发生断言,Some(true)则表示断言为真,反之为假。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: Option<bool> = None;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = None;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = Some(X == 1);
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();

        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

结果应该是类似于:

0: Some(true)
1: None
2: None
3: Some(true)
4: None
5: Some(true)
6: Some(true)
7: Some(true)
8: None
9: None

不过编译重排在这个例子里面仍然很难复现,即使是全用Relaxed也很难复现出Some(false)的结果来,尤其thread2,Ordering用什么枚举值可以说毫无影响。

我唯一一次复现出Relaxed出现编译重排的情况是在自旋锁的简单实现里,确实出现了编译重排导致自旋锁失效,改成Acquire/Release/SeqCst就没问题了,但也是在同一作用域下的问题,如果出现跨作用域或者出现影响线程的语句,也不会出现编译重排,比如thread2这种情况,判断语句和判断后的执行块理论上应该不会出现乱序的情况。

night-cruise 2023-03-01 12:19

我没太理解你想要说什么。happens before 是编程语言级别定义的语义,跟底层的具体实现没有关系,最终由编译器负责将其映射到各个平台(X86、ARM等)。而且在分析无锁数据结构时,一般不会考虑重排序/处理器缓存之类的,而是会用更简单的模型,例如:一个操作是否对另一个操作可见、一个 load 操作是否会看到 old value等(类似于分布式系统中一个读请求是否会看到old value)。另外,是否会读到 old value并不重要,而是读到 old value 是否会破坏并发数据结构的正确性,如果是的话才会要对其施加一些同步。

--
👇
0xe994a4: happens before 是不是要分两种情况讨论?一是在当前 CPU 的事件上建立,二是在多个 CPU 上建立。

--
👇
night-cruise: Release/Acquire 本来就是跨线程配对使用的,用来建立 happens before 关系。SeqCst 的话则更加复杂,我们需要考虑所有的 SeqCst fence 的 interleaving,论证每种情况下是否满足无锁数据结构的正确性,这常常会导致”组合爆炸“。例如:https://github.com/crossbeam-rs/rfcs/blob/master/text/2017-07-23-relaxed-memory.md

--
👇
night-cruise: 我们想要的是断言 assert_eq!(X, 1) 不会失败,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1),那么断言根本不会执行,因为 Flag.load 出来的值是0,不会进入的 if 分支中去。你需要这样改代码:

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                } else { ASSERT_EQ = true; }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

--
👇
night-cruise: 你没有考虑 thread interleaving 的情况,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1) 那么,得到的肯定 False(因为你的代码里给的初始值是 False)。但是我们想要的是如果 Flag.store(1, Release) -> Flag.load(Acquire) -> assert_eq(X, 1),这个时候我们想要得到 True。

--
👇
lithbitren:

👇
night-cruise:

             thread1               |              thread2
      X = 1                        |        if Flag.load(Acquire) == 1:
      Flag.store(1, Release)       |            assert_eq!(X, 1)

这样 Flag.store(1, Release) happens before Flag.load(Acquire),Flag.store之前的所有指令对 Flag.load 之后的所有指令可见,那么断言 assert_eq!(X, 1) 必然成功。


网上很多教程确实拿的是跨线程来对Acquire/Release举例,本质上其实对着标准库念经,不代表其一定正确。

原子操作跨线程约束执行顺序显然是有问题的,如果可以约束的话,那不就成锁了吗?

比如thread1如果在执行X = 1前休眠了10毫秒,不管Ordering枚举标记为什么值在thread2都不可能断言成功。

Ordering控制的执行顺序应该是指同一作用域下执行顺序,跨线程跨作用域的原子操作理论上是不会相互影响的。

把这个例子补全成代码就可以看出,不管Ordering的枚举值怎么标记,X的结果都是混乱的。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

执行结果:

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.22s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: true
3: true
4: true
5: false
6: false
7: true
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: true
1: true
2: false
3: false
4: false
5: false
6: false
7: false
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: false
1: false
2: true
3: false
4: true
5: true
6: true
7: true
8: false
9: false

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.68s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: false
3: false
4: false
5: true
6: true
7: false
8: true
9: false
作者 0xe994a4 2023-03-01 11:40

happens before 是不是要分两种情况讨论?一是在当前 CPU 的事件上建立,二是在多个 CPU 上建立。

--
👇
night-cruise: Release/Acquire 本来就是跨线程配对使用的,用来建立 happens before 关系。SeqCst 的话则更加复杂,我们需要考虑所有的 SeqCst fence 的 interleaving,论证每种情况下是否满足无锁数据结构的正确性,这常常会导致”组合爆炸“。例如:https://github.com/crossbeam-rs/rfcs/blob/master/text/2017-07-23-relaxed-memory.md

--
👇
night-cruise: 我们想要的是断言 assert_eq!(X, 1) 不会失败,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1),那么断言根本不会执行,因为 Flag.load 出来的值是0,不会进入的 if 分支中去。你需要这样改代码:

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                } else { ASSERT_EQ = true; }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

--
👇
night-cruise: 你没有考虑 thread interleaving 的情况,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1) 那么,得到的肯定 False(因为你的代码里给的初始值是 False)。但是我们想要的是如果 Flag.store(1, Release) -> Flag.load(Acquire) -> assert_eq(X, 1),这个时候我们想要得到 True。

--
👇
lithbitren:

👇
night-cruise:

             thread1               |              thread2
      X = 1                        |        if Flag.load(Acquire) == 1:
      Flag.store(1, Release)       |            assert_eq!(X, 1)

这样 Flag.store(1, Release) happens before Flag.load(Acquire),Flag.store之前的所有指令对 Flag.load 之后的所有指令可见,那么断言 assert_eq!(X, 1) 必然成功。


网上很多教程确实拿的是跨线程来对Acquire/Release举例,本质上其实对着标准库念经,不代表其一定正确。

原子操作跨线程约束执行顺序显然是有问题的,如果可以约束的话,那不就成锁了吗?

比如thread1如果在执行X = 1前休眠了10毫秒,不管Ordering枚举标记为什么值在thread2都不可能断言成功。

Ordering控制的执行顺序应该是指同一作用域下执行顺序,跨线程跨作用域的原子操作理论上是不会相互影响的。

把这个例子补全成代码就可以看出,不管Ordering的枚举值怎么标记,X的结果都是混乱的。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

执行结果:

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.22s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: true
3: true
4: true
5: false
6: false
7: true
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: true
1: true
2: false
3: false
4: false
5: false
6: false
7: false
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: false
1: false
2: true
3: false
4: true
5: true
6: true
7: true
8: false
9: false

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.68s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: false
3: false
4: false
5: true
6: true
7: false
8: true
9: false
作者 0xe994a4 2023-03-01 11:17

感觉思路可以从“原理是什么?”转变成“每个Ordering适用的场景?它们之间的性能区别和分析方法上”。

作者 0xe994a4 2023-03-01 11:15

很多文章里都提到乱序,但不好理解。

对于编译器重排造成的乱序,容易理解,就是编译出来的机器码实际上可能和源码的顺序反了,或者说部分源码被删除了。

对于硬件重排造成的乱序,其实要从 CPU 的角度看。 首先,不依赖其他事件的操作不关心乱序。 互相依赖的操作(例如前面 night-cruise 举的例子),对应的数据在 Cache 和 Memory 中同时存在。如果 CPU 0 在自己 Cache 中修改了数据,但没有更新到 Memory 中,也没有通知其它 CPU ,那么其它在 CPU 看到自己 Cache 中的数据和 CPU 0 上的就不一样,也就是乱序的情况。

作者 0xe994a4 2023-03-01 11:05

确实,感觉很多都是八股文。

原子操作跨线程是可以加约束的。按照 Why-Memory-Barriers 里的分析来看,假设有一个赋值操作 a = 0,那么 CPU 执行这个操作和把这个操作刷到 Cache 中是两个过程。没有约束的话,CPU 在执行之后就会刷新对应的 Cache。加上约束的话(即内存屏障),CPU 在执行后并不刷新对应的 Cache,而是写入 store buffer。 至于什么是 store buffer 以及为什么要刷入 store buffer,是因为多核多线程下,要对共享变量取得共识才能达到正确的(或者说可预期的)结果而设计的一些机制,可以仔细读读这篇http://www.wowotech.net/kernel_synchronization/Why-Memory-Barriers.html。 相比于加锁导致的阻塞等待,那么 store buffer 可以让 CPU 把计算结果先放到一边,不阻塞,并且继续执行下一条指令。当然,store buffer 毕竟空间有限,当其空间用完了,CPU 也还是会有阻塞的情况。

直觉上来讲,原子操作应该是一个单一的事件,例如计数。这类操作不会依赖别的事件,所以只要保证原子性即可。而 Relaxed 就适用这种情况。

看了你之前的例子,多核多线程修改多个全局变量,变量之间存在依赖关系,所以 Relaxed 不适用(到现在我还不太理解 Release、Acquire、SeqCst,ORZ)。

--
👇
lithbitren:

👇
night-cruise:

             thread1               |              thread2
      X = 1                        |        if Flag.load(Acquire) == 1:
      Flag.store(1, Release)       |            assert_eq!(X, 1)

这样 Flag.store(1, Release) happens before Flag.load(Acquire),Flag.store之前的所有指令对 Flag.load 之后的所有指令可见,那么断言 assert_eq!(X, 1) 必然成功。


网上很多教程确实拿的是跨线程来对Acquire/Release举例,本质上其实对着标准库念经,不代表其一定正确。

原子操作跨线程约束执行顺序显然是有问题的,如果可以约束的话,那不就成锁了吗?

比如thread1如果在执行X = 1前休眠了10毫秒,不管Ordering枚举标记为什么值在thread2都不可能断言成功。

Ordering控制的执行顺序应该是指同一作用域下执行顺序,跨线程跨作用域的原子操作理论上是不会相互影响的。

把这个例子补全成代码就可以看出,不管Ordering的枚举值怎么标记,X的结果都是混乱的。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

执行结果:

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.22s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: true
3: true
4: true
5: false
6: false
7: true
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: true
1: true
2: false
3: false
4: false
5: false
6: false
7: false
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: false
1: false
2: true
3: false
4: true
5: true
6: true
7: true
8: false
9: false

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.68s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: false
3: false
4: false
5: true
6: true
7: false
8: true
9: false
night-cruise 2023-03-01 11:04

Release/Acquire 本来就是跨线程配对使用的,用来建立 happens before 关系。SeqCst 的话则更加复杂,我们需要考虑所有的 SeqCst fence 的 interleaving,论证每种情况下是否满足无锁数据结构的正确性,这常常会导致”组合爆炸“。例如:https://github.com/crossbeam-rs/rfcs/blob/master/text/2017-07-23-relaxed-memory.md

--
👇
night-cruise: 我们想要的是断言 assert_eq!(X, 1) 不会失败,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1),那么断言根本不会执行,因为 Flag.load 出来的值是0,不会进入的 if 分支中去。你需要这样改代码:

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                } else { ASSERT_EQ = true; }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

--
👇
night-cruise: 你没有考虑 thread interleaving 的情况,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1) 那么,得到的肯定 False(因为你的代码里给的初始值是 False)。但是我们想要的是如果 Flag.store(1, Release) -> Flag.load(Acquire) -> assert_eq(X, 1),这个时候我们想要得到 True。

--
👇
lithbitren:

👇
night-cruise:

             thread1               |              thread2
      X = 1                        |        if Flag.load(Acquire) == 1:
      Flag.store(1, Release)       |            assert_eq!(X, 1)

这样 Flag.store(1, Release) happens before Flag.load(Acquire),Flag.store之前的所有指令对 Flag.load 之后的所有指令可见,那么断言 assert_eq!(X, 1) 必然成功。


网上很多教程确实拿的是跨线程来对Acquire/Release举例,本质上其实对着标准库念经,不代表其一定正确。

原子操作跨线程约束执行顺序显然是有问题的,如果可以约束的话,那不就成锁了吗?

比如thread1如果在执行X = 1前休眠了10毫秒,不管Ordering枚举标记为什么值在thread2都不可能断言成功。

Ordering控制的执行顺序应该是指同一作用域下执行顺序,跨线程跨作用域的原子操作理论上是不会相互影响的。

把这个例子补全成代码就可以看出,不管Ordering的枚举值怎么标记,X的结果都是混乱的。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

执行结果:

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.22s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: true
3: true
4: true
5: false
6: false
7: true
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: true
1: true
2: false
3: false
4: false
5: false
6: false
7: false
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: false
1: false
2: true
3: false
4: true
5: true
6: true
7: true
8: false
9: false

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.68s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: false
3: false
4: false
5: true
6: true
7: false
8: true
9: false
night-cruise 2023-03-01 11:00

我们想要的是断言 assert_eq!(X, 1) 不会失败,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1),那么断言根本不会执行,因为 Flag.load 出来的值是0,不会进入的 if 分支中去。你需要这样改代码:

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                } else { ASSERT_EQ = true; }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

--
👇
night-cruise: 你没有考虑 thread interleaving 的情况,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1) 那么,得到的肯定 False(因为你的代码里给的初始值是 False)。但是我们想要的是如果 Flag.store(1, Release) -> Flag.load(Acquire) -> assert_eq(X, 1),这个时候我们想要得到 True。

--
👇
lithbitren:

👇
night-cruise:

             thread1               |              thread2
      X = 1                        |        if Flag.load(Acquire) == 1:
      Flag.store(1, Release)       |            assert_eq!(X, 1)

这样 Flag.store(1, Release) happens before Flag.load(Acquire),Flag.store之前的所有指令对 Flag.load 之后的所有指令可见,那么断言 assert_eq!(X, 1) 必然成功。


网上很多教程确实拿的是跨线程来对Acquire/Release举例,本质上其实对着标准库念经,不代表其一定正确。

原子操作跨线程约束执行顺序显然是有问题的,如果可以约束的话,那不就成锁了吗?

比如thread1如果在执行X = 1前休眠了10毫秒,不管Ordering枚举标记为什么值在thread2都不可能断言成功。

Ordering控制的执行顺序应该是指同一作用域下执行顺序,跨线程跨作用域的原子操作理论上是不会相互影响的。

把这个例子补全成代码就可以看出,不管Ordering的枚举值怎么标记,X的结果都是混乱的。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

执行结果:

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.22s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: true
3: true
4: true
5: false
6: false
7: true
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: true
1: true
2: false
3: false
4: false
5: false
6: false
7: false
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: false
1: false
2: true
3: false
4: true
5: true
6: true
7: true
8: false
9: false

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.68s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: false
3: false
4: false
5: true
6: true
7: false
8: true
9: false
night-cruise 2023-03-01 10:56

你没有考虑 thread interleaving 的情况,如果 thread2 先执行 Flag.load(Acquire) -> asser_eq(X, 1) 那么,得到的肯定 False(因为你的代码里给的初始值是 False)。但是我们想要的是如果 Flag.store(1, Release) -> Flag.load(Acquire) -> assert_eq(X, 1),这个时候我们想要得到 True。

--
👇
lithbitren:

👇
night-cruise:

             thread1               |              thread2
      X = 1                        |        if Flag.load(Acquire) == 1:
      Flag.store(1, Release)       |            assert_eq!(X, 1)

这样 Flag.store(1, Release) happens before Flag.load(Acquire),Flag.store之前的所有指令对 Flag.load 之后的所有指令可见,那么断言 assert_eq!(X, 1) 必然成功。


网上很多教程确实拿的是跨线程来对Acquire/Release举例,本质上其实对着标准库念经,不代表其一定正确。

原子操作跨线程约束执行顺序显然是有问题的,如果可以约束的话,那不就成锁了吗?

比如thread1如果在执行X = 1前休眠了10毫秒,不管Ordering枚举标记为什么值在thread2都不可能断言成功。

Ordering控制的执行顺序应该是指同一作用域下执行顺序,跨线程跨作用域的原子操作理论上是不会相互影响的。

把这个例子补全成代码就可以看出,不管Ordering的枚举值怎么标记,X的结果都是混乱的。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

执行结果:

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.22s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: true
3: true
4: true
5: false
6: false
7: true
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: true
1: true
2: false
3: false
4: false
5: false
6: false
7: false
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: false
1: false
2: true
3: false
4: true
5: true
6: true
7: true
8: false
9: false

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.68s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: false
3: false
4: false
5: true
6: true
7: false
8: true
9: false
lithbitren 2023-03-01 01:54

--
👇
night-cruise:

             thread1               |              thread2
      X = 1                        |        if Flag.load(Acquire) == 1:
      Flag.store(1, Release)       |            assert_eq!(X, 1)

这样 Flag.store(1, Release) happens before Flag.load(Acquire),Flag.store之前的所有指令对 Flag.load 之后的所有指令可见,那么断言 assert_eq!(X, 1) 必然成功。


网上很多教程确实拿的是跨线程来对Acquire/Release举例,本质上其实对着标准库念经,不代表其一定正确。

原子操作跨线程约束执行顺序显然是有问题的,如果可以约束的话,那不就成锁了吗?

比如thread1如果在执行X = 1前休眠了10毫秒,不管Ordering枚举标记为什么值在thread2都不可能断言成功。

Ordering控制的执行顺序应该是指同一作用域下执行顺序,跨线程跨作用域的原子操作理论上是不会相互影响的。

把这个例子补全成代码就可以看出,不管Ordering的枚举值怎么标记,X的结果都是混乱的。

use std::sync::atomic;
use std::thread;
use std::time::Duration;

static mut X: usize = 0;
static FLAG: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
static mut ASSERT_EQ: bool = false;

const TIMES: usize = 10;

fn main() {
    for i in 0..TIMES {
        unsafe {
            X = 0;
            FLAG.store(0, atomic::Ordering::SeqCst);
            ASSERT_EQ = false;
        }

        let thread1 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                X = 1;
                // FLAG.store(1, atomic::Ordering::Relaxed);
                FLAG.store(1, atomic::Ordering::Release);
            }
        });

        let thread2 = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            unsafe {
                // if FLAG.load(atomic::Ordering::Relaxed) == 1 {
                if FLAG.load(atomic::Ordering::Acquire) == 1 {
                    ASSERT_EQ = X == 1;
                }
            }
        });

        thread1.join().unwrap();
        thread2.join().unwrap();
        
        println!("{i}: {:?}", unsafe { ASSERT_EQ });
    }
}

执行结果:

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.22s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: true
3: true
4: true
5: false
6: false
7: true
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: true
1: true
2: false
3: false
4: false
5: false
6: false
7: false
8: true
9: false

F:\rust\ordering-test>cargo run --release
    Finished release [optimized] target(s) in 0.01s
     Running `target\release\ordering-test.exe`
0: false
1: false
2: true
3: false
4: true
5: true
6: true
7: true
8: false
9: false

F:\rust\ordering-test>cargo run --release
   Compiling ordering-test v0.1.0 (F:\rust\ordering-test)
    Finished release [optimized] target(s) in 6.68s
     Running `target\release\ordering-test.exe`
0: true
1: false
2: false
3: false
4: false
5: true
6: true
7: false
8: true
9: false
1 2 共 24 条评论, 2 页