< 返回我的博客

Pslydhh 发表于 2018-06-08 13:37

Tags:线程,并发,共享

同步:https://zhuanlan.zhihu.com/p/37760452

Rust通过独特的编译期检查,在很大程序上阻止了难懂的并发bug。

本文探索Rust线程间共享数据的方式。

我们用AtomicUsize,这样一个简单的例子来实践。

第一种: 传统的采用Arc:Arc带有一个引用计数,通过clone()为每一个线程生成一份数据,再move给线程。这些线程通过clone的Arc又指向了同一份底层数据ptr:


use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
	let val = Arc::new(AtomicUsize::new(0));
	let mut guards = vec![];
	for _ in 0..8 {
		let val = val.clone();
		guards.push(
			thread::spawn(move || {
				let v = val.fetch_add(1, Ordering::SeqCst);
				println!("{:?}", v);
			}) 
		);
	}
	
	for guard in guards {
		guard.join().unwrap();
	}
	println!("over");
}

通过源码我们可以看到所有Arc都是通过strong这个atomicusize来维护引用个数的,从而当strong降为0时回收ptr指向的数据:


        let x: Box<_> = box ArcInner {
            strong: atomic::AtomicUsize::new(1),
            weak: atomic::AtomicUsize::new(1),
            data,
        };
        Arc { ptr: Box::into_raw_non_null(x), phantom: PhantomData }

        fn clone(&self) -> Arc {
            let old_size = self.inner().strong.fetch_add(1, Relaxed);
            if old_size > MAX_REFCOUNT {
                unsafe {
                    abort();
                }
            }
            Arc { ptr: self.ptr, phantom: PhantomData }
        }

	fn drop(&mut self) {
		if self.inner().strong.fetch_sub(1, Release) != 1 {
			return;
		}

		atomic::fence(Acquire);

		unsafe {
			self.drop_slow();
		}
	}

我们甚至可以自己模拟一个类似的Arc来加深理解,代码如下:


#![feature(box_syntax)]
#![feature(box_into_raw_non_null)]
#![feature(allocator_api)]

use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic;
use std::thread;
use std::ops::Deref;
use std::ptr::NonNull; 
use std::ptr;
use std::heap::{Heap, Alloc, Layout};

pub struct Arc2 {
	ptr: NonNull>,
}

unsafe impl Send for Arc2 {}
unsafe impl Sync for Arc2 {}

pub struct ArcInner2 {
	strong: AtomicUsize,
	data: T,
}

impl Arc2 {
	pub fn new(data: T) -> Arc2 {
		let x: Box<_> = box ArcInner2 {
			strong: AtomicUsize::new(1),
			data,
		};
		Arc2 { ptr: Box::into_raw_non_null(x) }
	}
}

impl Arc2 {
	pub fn inner(&self) -> &ArcInner2 {
		unsafe { self.ptr.as_ref() }
	}
	
	unsafe fn drop_slow(&mut self) {
	    let ptr = self.ptr.as_ptr();
	
	    // Destroy the data at this time, even though we may not free the box
	    // allocation itself (there may still be weak pointers lying around).
	    ptr::drop_in_place(&mut self.ptr.as_mut().data);
	
//	    if self.inner().weak.fetch_sub(1, AtomicOrdering::Release) == 1 {
	        atomic::fence(AtomicOrdering::Acquire);
	        Heap.dealloc(ptr as *mut u8, Layout::for_value(&*ptr))
//	    }
	}
}    

impl Clone for Arc2 {
	fn clone(&self) -> Arc2 {
		let old_size = self.inner().strong.fetch_add(1, AtomicOrdering::Relaxed);
		if old_size > 1024 {
			panic!("NO");
		}
		Arc2 { ptr: self.ptr }
	}
}

impl Deref for Arc2 {
	type Target = T;

	fn deref(&self) -> &T {
		&self.inner().data
	}
}

impl Drop for Arc2 {
    fn drop(&mut self) {
        if self.inner().strong.fetch_sub(1, AtomicOrdering::Release) != 1 {
            return;
        }
        atomic::fence(AtomicOrdering::Acquire);
        unsafe {
            self.drop_slow();
        }
    }
}

fn main() {
	let val = Arc2::new(AtomicUsize::new(0));
	let mut guards = vec![];
	for _ in 0..8 {
		let val = val.clone();
		guards.push(
			thread::spawn(move || {
				let v = val.fetch_add(1, AtomicOrdering::SeqCst);
				println!("{:?}", v);
			})
		);
	}
	
	for guard in guards {
		guard.join().unwrap();
	}
	println!("over");
}

这里除了算法的关键是这两句:


unsafe impl Send for Arc2 {}
unsafe impl Sync for Arc2 {}

std::ptr::NonNull>` cannot be sent between threads safely

这是挺奇怪的,因为我们的Send和Sync是给Arc2的,这里的报错却不是Arc2而是std::ptr::NonNull.

第二种: 我们接着第一种的方法,发现NonNull是没有Send跟Sync实现的:


/// `NonNull` pointers are not `Send` because the data they reference may be aliased.
// NB: This impl is unnecessary, but should provide better error messages.
#[stable(feature = "nonnull", since = "1.25.0")]
impl !Send for NonNull { }

/// `NonNull` pointers are not `Sync` because the data they reference may be aliased.
// NB: This impl is unnecessary, but should provide better error messages.
#[stable(feature = "nonnull", since = "1.25.0")]
impl !Sync for NonNull { }

但是另一个类似的结构Unique却是实现了Send/Sync的:


/// `Unique` pointers are `Send` if `T` is `Send` because the data they
/// reference is unaliased. Note that this aliasing invariant is
/// unenforced by the type system; the abstraction using the
/// `Unique` must enforce it.
#[unstable(feature = "ptr_internals", issue = "0")]
unsafe impl Send for Unique { }

/// `Unique` pointers are `Sync` if `T` is `Sync` because the data they
/// reference is unaliased. Note that this aliasing invariant is
/// unenforced by the type system; the abstraction using the
/// `Unique` must enforce it.
#[unstable(feature = "ptr_internals", issue = "0")]
unsafe impl Sync for Unique { }

同时我们看到NonNull跟Unique是可以转化的:


    #[unstable(feature = "box_into_raw_non_null", issue = "47336")]
    #[inline]
    pub fn into_raw_non_null(b: Box) -> NonNull {
        Box::into_unique(b).into()
    }

    #[unstable(feature = "ptr_internals", issue = "0", reason = "use into_raw_non_null instead")]
    #[inline]
    pub fn into_unique(b: Box) -> Unique {
        let unique = b.0;
        mem::forget(b);
        unique
    }

所以我们可以尝试直接采用Unique来共享数据....

结果是成功了,代码如下:


#![feature(ptr_internals)]

use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::thread;

fn main() {
	let val = Box::into_unique(Box::new(AtomicUsize::new(0)));
	let mut guards = vec![];
	for _ in 0..8 {
		guards.push(
			thread::spawn(move || {
			  let v = unsafe{(&*val.as_ptr())}.fetch_add(1, AtomicOrdering::SeqCst);
		    println!("{:?}", v);
			})
		);
	}

	for guard in guards {
		guard.join().unwrap();
	}
	
	unsafe {
		Box::from_raw(val.as_ptr());
	}
	println!("over");
}

由于我们已经放弃了自动回收内存,所以当其他线程都结束后,Box::from_raw(val.as_ptr())用于回收数据。

第三种: 最后这种方法看起来最tricky,也是crossbeam和scoped-threadpool所采用的方式。


use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::thread;

trait FnMove {
	fn call(self: Box);
}

impl FnMove for F {
	fn call(self: Box) { (*self)() }
}	

fn main() {
	let val = AtomicUsize::new(0);
	let mut guards = vec![];
	for _ in 0..8 {
		let closure: Box = unsafe {std::mem::transmute(Box::new( || {
			let v = val.fetch_add(1, AtomicOrdering::SeqCst);
			println!("{:?}", v);
    }) as Box)};
		 
		guards.push(
			thread::spawn(move || closure.call())
		);
	}

	for guard in guards {
		guard.join().unwrap();
	}
	println!("over");

}

转换了思路,不再对共享的变量进行操作,而是把线程执行的闭包强制转换为Send + static,之后闭包里可以直接对共享变量进行操作了!

同步:https://zhuanlan.zhihu.com/p/37760452

评论区

写评论

还没有评论

1 共 0 条评论, 1 页