< 返回版块

Warrenren 发表于 2022-05-30 17:23

以下是RUST标准库mpsc的queue代码分析,
代码路径:library/std/src/mpsc/mpsc_queue.rs
几乎是见到的最简单的一个支持多线程写,单线程读的队列数据结构了。

//以下是简单的FIFO的队列实现

pub enum PopResult<T> {
    //返回队列成员
    Data(T),
    //队列为空
    Empty,
    //队列的一致性出错
    Inconsistent,
}

//节点结构
struct Node<T> {
    //next指针,利用原子指针实现多线程的Sync,值得牢记
    next: AtomicPtr<Node<T>>,
    value: Option<T>,
}

///  能够被多个线程操作的队列
pub struct Queue<T> {
    //利用原子指针操作实现多线程的Sync,极大简化了代码
    head: AtomicPtr<Node<T>>,
    //从后面的代码看,这里实际上是队列的头部,这个Queue的代码搞得奇怪
    tail: UnsafeCell<*mut Node<T>>,
}

unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}

impl<T> Node<T> {
    unsafe fn new(v: Option<T>) -> *mut Node<T> {
        //申请堆内存后,将堆内存的指针提取出来
        Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
    }
}

impl<T> Queue<T> {
    pub fn new() -> Queue<T> {
        let stub = unsafe { Node::new(None) };
        //生成一个空元素的节点列表
        Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
    }

    //在头部
    pub fn push(&self, t: T) {
        unsafe {
            let n = Node::new(Some(t));
            //换成C的话,就是head->next = n; head = n
            //对于空队列来说,是tail = head; head->next = n; head = n; 
            //现在tail实际上是队列头部,head是尾部。tail的next是第一个有意义的成员 
            let prev = self.head.swap(n, Ordering::AcqRel);
            //要考虑在两个赋值中间加入了其他线程的操作是否会出问题,
            //这里面有一个复杂的分析,
            //假设原队列为head, 有两个线程分别插入新节点n,m
            //当n先执行,而m在这个代码位置插入,则m插入前prev_n = pre_head, head = n
            //m插入后,prev_m = n, head = m。如果n先执行下面的语句,执行完后 
            // pre_head->next = n, n->next = null,然后m执行完下面语句
            // pre_head->next = n, n->next = m, head = m,队列是正确的。
            // 如果m先执行,执行完后 pre_head->next = null, n->next = m, head = m;
            // 然后n执行,执行完成后 pre_head->next = n, n->next = m, head =m, 队列是正确的。
            // 换成多个线程实际上也一样是正确的。这个地方处理十分巧妙,这是系统级编程语言的魅
            //力, 当然,实际上是裸指针编程的魅力            
            (*prev).next.store(n, Ordering::Release);
            
        }
    }

    //仅有一个线程在pop
    pub fn pop(&self) -> PopResult<T> {
        unsafe {
            //tail实际上是队列头,value是None
            let tail = *self.tail.get();
            //tail的next是第一个有意义的成员
            let next = (*tail).next.load(Ordering::Acquire);

            //next如果为空,说明队列是空队列
            if !next.is_null() {
                //此处原tail会被drop,tail被赋成next
                //因为push只可能改变next,所以这里不会有线程冲突问题
                //这个语句完成后,队列是完整及一致的 
                *self.tail.get() = next;
                assert!((*tail).value.is_none());
                assert!((*next).value.is_some());
                //将value的所有权转移出来,*next的value又重新置为None
                //当tail == head的时候 就又都是stub了
                let ret = (*next).value.take().unwrap();
                //恢复Box,以便以后释放堆内存
                let _: Box<Node<T>> = Box::from_raw(tail);
                return Data(ret);
            }

            //判断是否出错
            if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
        }
    }
}

impl<T> Drop for Queue<T> {
    fn drop(&mut self) {
        unsafe {
            //空队列的stub也要释放
            let mut cur = *self.tail.get();
            while !cur.is_null() {
                let next = (*cur).next.load(Ordering::Relaxed);
                //恢复Box并消费掉,释放堆内存
                let _: Box<Node<T>> = Box::from_raw(cur);
                cur = next;
            }
        }
    }
}

以上摘自《深入rust标准库》,该书已经正式出版,恳请支持

评论区

写评论

还没有评论

1 共 0 条评论, 1 页