< 返回我的博客

sxfworks 发表于 2021-02-20 20:23

Tags:look-free,async,ring,buffer,ring buffer,异步,环形buffer

Async lock-free Ringbuffer

AsyncRingBuffer是一个异步的、single-producer-single-consumer、固定容量的环形buffer, 可以被使用过在两个位于不同线程之间的异步任务上,它的特点主要如下:

  • lock-free
  • spsc
  • 固定容量支持,buffer底层内存支持回收复用
  • 为读端实现AsyncRead及AsyncBufRead等trait
  • 为写端实现AsyncWrite等trait

项目地址

github: https://github.com/sxfworks/fixed_ring_buffer

如何使用

在项目Cargo.toml文件的dependencies依赖项下面添加如下内容:

[dependencies]
fixed_ring_buffer = "0.1.0"

一个使用async ring buffer实现的完整例子:

use std::sync::Arc;
use bytes::BufMut;
use async_std::task;
use std::{thread};
use futures::io::{AsyncWriteExt, AsyncReadExt};

use fixed_ring_buffer::async_ring_buffer::{RingBufferReader, RingBufferWriter, RingBuffer};

fn main() {
    let ring_buffer = Arc::new(RingBuffer::new(32960));

    let mut reader = RingBufferReader::new(ring_buffer.clone());
    let mut writer = RingBufferWriter::new(ring_buffer.clone());

    let t1 = thread::spawn(move ||{
        let handle = task::spawn(async move {
        let mut length = 0 as usize;
        let mut contents: Vec<u8> = Vec::with_capacity(16096);
        contents.resize(16096, 0);
        loop {
            match reader.read(&mut contents).await {
            Ok(size) => {
                if size > 0 {
                length += size;
                } else {
                break;
                }
            },
            Err(e) => {
                panic!("read err = {}", e);
            },
            }
        }

        println!("length = {}", length);
        });

        task::block_on(handle);
    });

    let t2 = thread::spawn(move ||{
        let handle = task::spawn(async move {
        let mut length = 0 as usize;
        let mut contents: Vec<u8> = Vec::new();
        contents.put("xxxxxnnnnnmmmmmmmmsssss".as_bytes());
        for i in 0..102400 as usize {
            match writer.write_all(&mut contents).await {
            Ok(()) => {
                length += contents.len();
            },
            Err(e) => {
                panic!("write err = {} index = {}", e, i);
            },
            }
        }
        println!("length = {}", length);
        });

        task::block_on(handle);
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

评论区

写评论

还没有评论

1 共 0 条评论, 1 页