Async lock-free Ringbuffer
AsyncRingBuffer是一个异步的、single-producer-single-consumer、固定容量的环形buffer, 可以被使用过在两个位于不同线程之间的异步任务上,它的特点主要如下:
- lock-free
- spsc
- 固定容量支持,buffer底层内存支持回收复用
- 为读端实现AsyncRead及AsyncBufRead等trait
- 为写端实现AsyncWrite等trait
项目地址
github: https://github.com/sxfworks/fixed_ring_buffer
如何使用
在项目Cargo.toml文件的dependencies依赖项下面添加如下内容:
[dependencies]
fixed_ring_buffer = "0.1.0"
一个使用async ring buffer实现的完整例子:
use std::sync::Arc;
use bytes::BufMut;
use async_std::task;
use std::{thread};
use futures::io::{AsyncWriteExt, AsyncReadExt};
use fixed_ring_buffer::async_ring_buffer::{RingBufferReader, RingBufferWriter, RingBuffer};
fn main() {
let ring_buffer = Arc::new(RingBuffer::new(32960));
let mut reader = RingBufferReader::new(ring_buffer.clone());
let mut writer = RingBufferWriter::new(ring_buffer.clone());
let t1 = thread::spawn(move ||{
let handle = task::spawn(async move {
let mut length = 0 as usize;
let mut contents: Vec<u8> = Vec::with_capacity(16096);
contents.resize(16096, 0);
loop {
match reader.read(&mut contents).await {
Ok(size) => {
if size > 0 {
length += size;
} else {
break;
}
},
Err(e) => {
panic!("read err = {}", e);
},
}
}
println!("length = {}", length);
});
task::block_on(handle);
});
let t2 = thread::spawn(move ||{
let handle = task::spawn(async move {
let mut length = 0 as usize;
let mut contents: Vec<u8> = Vec::new();
contents.put("xxxxxnnnnnmmmmmmmmsssss".as_bytes());
for i in 0..102400 as usize {
match writer.write_all(&mut contents).await {
Ok(()) => {
length += contents.len();
},
Err(e) => {
panic!("write err = {} index = {}", e, i);
},
}
}
println!("length = {}", length);
});
task::block_on(handle);
});
t1.join().unwrap();
t2.join().unwrap();
}
1
共 0 条评论, 1 页
评论区
写评论还没有评论