同步:https://zhuanlan.zhihu.com/p/37760452
Rust通过独特的编译期检查,在很大程序上阻止了难懂的并发bug。
本文探索Rust线程间共享数据的方式。
我们用AtomicUsize,这样一个简单的例子来实践。
第一种: 传统的采用Arc:Arc带有一个引用计数,通过clone()为每一个线程生成一份数据,再move给线程。这些线程通过clone的Arc又指向了同一份底层数据ptr:
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() {
let val = Arc::new(AtomicUsize::new(0));
let mut guards = vec![];
for _ in 0..8 {
let val = val.clone();
guards.push(
thread::spawn(move || {
let v = val.fetch_add(1, Ordering::SeqCst);
println!("{:?}", v);
})
);
}
for guard in guards {
guard.join().unwrap();
}
println!("over");
}
通过源码我们可以看到所有Arc都是通过strong这个atomicusize来维护引用个数的,从而当strong降为0时回收ptr指向的数据:
let x: Box<_> = box ArcInner {
strong: atomic::AtomicUsize::new(1),
weak: atomic::AtomicUsize::new(1),
data,
};
Arc { ptr: Box::into_raw_non_null(x), phantom: PhantomData }
fn clone(&self) -> Arc {
let old_size = self.inner().strong.fetch_add(1, Relaxed);
if old_size > MAX_REFCOUNT {
unsafe {
abort();
}
}
Arc { ptr: self.ptr, phantom: PhantomData }
}
fn drop(&mut self) {
if self.inner().strong.fetch_sub(1, Release) != 1 {
return;
}
atomic::fence(Acquire);
unsafe {
self.drop_slow();
}
}
我们甚至可以自己模拟一个类似的Arc来加深理解,代码如下:
#![feature(box_syntax)]
#![feature(box_into_raw_non_null)]
#![feature(allocator_api)]
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic;
use std::thread;
use std::ops::Deref;
use std::ptr::NonNull;
use std::ptr;
use std::heap::{Heap, Alloc, Layout};
pub struct Arc2 {
ptr: NonNull>,
}
unsafe impl Send for Arc2 {}
unsafe impl Sync for Arc2 {}
pub struct ArcInner2 {
strong: AtomicUsize,
data: T,
}
impl Arc2 {
pub fn new(data: T) -> Arc2 {
let x: Box<_> = box ArcInner2 {
strong: AtomicUsize::new(1),
data,
};
Arc2 { ptr: Box::into_raw_non_null(x) }
}
}
impl Arc2 {
pub fn inner(&self) -> &ArcInner2 {
unsafe { self.ptr.as_ref() }
}
unsafe fn drop_slow(&mut self) {
let ptr = self.ptr.as_ptr();
// Destroy the data at this time, even though we may not free the box
// allocation itself (there may still be weak pointers lying around).
ptr::drop_in_place(&mut self.ptr.as_mut().data);
// if self.inner().weak.fetch_sub(1, AtomicOrdering::Release) == 1 {
atomic::fence(AtomicOrdering::Acquire);
Heap.dealloc(ptr as *mut u8, Layout::for_value(&*ptr))
// }
}
}
impl Clone for Arc2 {
fn clone(&self) -> Arc2 {
let old_size = self.inner().strong.fetch_add(1, AtomicOrdering::Relaxed);
if old_size > 1024 {
panic!("NO");
}
Arc2 { ptr: self.ptr }
}
}
impl Deref for Arc2 {
type Target = T;
fn deref(&self) -> &T {
&self.inner().data
}
}
impl Drop for Arc2 {
fn drop(&mut self) {
if self.inner().strong.fetch_sub(1, AtomicOrdering::Release) != 1 {
return;
}
atomic::fence(AtomicOrdering::Acquire);
unsafe {
self.drop_slow();
}
}
}
fn main() {
let val = Arc2::new(AtomicUsize::new(0));
let mut guards = vec![];
for _ in 0..8 {
let val = val.clone();
guards.push(
thread::spawn(move || {
let v = val.fetch_add(1, AtomicOrdering::SeqCst);
println!("{:?}", v);
})
);
}
for guard in guards {
guard.join().unwrap();
}
println!("over");
}
这里除了算法的关键是这两句:
unsafe impl Send for Arc2 {}
unsafe impl Sync for Arc2 {}
std::ptr::NonNull>` cannot be sent between threads safely
这是挺奇怪的,因为我们的Send和Sync是给Arc2的,这里的报错却不是Arc2而是std::ptr::NonNull.
第二种: 我们接着第一种的方法,发现NonNull是没有Send跟Sync实现的:
/// `NonNull` pointers are not `Send` because the data they reference may be aliased.
// NB: This impl is unnecessary, but should provide better error messages.
#[stable(feature = "nonnull", since = "1.25.0")]
impl !Send for NonNull { }
/// `NonNull` pointers are not `Sync` because the data they reference may be aliased.
// NB: This impl is unnecessary, but should provide better error messages.
#[stable(feature = "nonnull", since = "1.25.0")]
impl !Sync for NonNull { }
但是另一个类似的结构Unique却是实现了Send/Sync的:
/// `Unique` pointers are `Send` if `T` is `Send` because the data they
/// reference is unaliased. Note that this aliasing invariant is
/// unenforced by the type system; the abstraction using the
/// `Unique` must enforce it.
#[unstable(feature = "ptr_internals", issue = "0")]
unsafe impl Send for Unique { }
/// `Unique` pointers are `Sync` if `T` is `Sync` because the data they
/// reference is unaliased. Note that this aliasing invariant is
/// unenforced by the type system; the abstraction using the
/// `Unique` must enforce it.
#[unstable(feature = "ptr_internals", issue = "0")]
unsafe impl Sync for Unique { }
同时我们看到NonNull跟Unique是可以转化的:
#[unstable(feature = "box_into_raw_non_null", issue = "47336")]
#[inline]
pub fn into_raw_non_null(b: Box) -> NonNull {
Box::into_unique(b).into()
}
#[unstable(feature = "ptr_internals", issue = "0", reason = "use into_raw_non_null instead")]
#[inline]
pub fn into_unique(b: Box) -> Unique {
let unique = b.0;
mem::forget(b);
unique
}
所以我们可以尝试直接采用Unique来共享数据....
结果是成功了,代码如下:
#![feature(ptr_internals)]
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::thread;
fn main() {
let val = Box::into_unique(Box::new(AtomicUsize::new(0)));
let mut guards = vec![];
for _ in 0..8 {
guards.push(
thread::spawn(move || {
let v = unsafe{(&*val.as_ptr())}.fetch_add(1, AtomicOrdering::SeqCst);
println!("{:?}", v);
})
);
}
for guard in guards {
guard.join().unwrap();
}
unsafe {
Box::from_raw(val.as_ptr());
}
println!("over");
}
由于我们已经放弃了自动回收内存,所以当其他线程都结束后,Box::from_raw(val.as_ptr())用于回收数据。
第三种: 最后这种方法看起来最tricky,也是crossbeam和scoped-threadpool所采用的方式。
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::thread;
trait FnMove {
fn call(self: Box);
}
impl FnMove for F {
fn call(self: Box) { (*self)() }
}
fn main() {
let val = AtomicUsize::new(0);
let mut guards = vec![];
for _ in 0..8 {
let closure: Box = unsafe {std::mem::transmute(Box::new( || {
let v = val.fetch_add(1, AtomicOrdering::SeqCst);
println!("{:?}", v);
}) as Box)};
guards.push(
thread::spawn(move || closure.call())
);
}
for guard in guards {
guard.join().unwrap();
}
println!("over");
}
转换了思路,不再对共享的变量进行操作,而是把线程执行的闭包强制转换为Send + static,之后闭包里可以直接对共享变量进行操作了!
同步:https://zhuanlan.zhihu.com/p/37760452
评论区
写评论还没有评论