把tokio-mini-redis键值数据处理这部分抽象出来成了一个并发的哈希表,可以稍微加深一下redis的理解。
思路上就是把每一个get/insert之类的操作抽象成枚举命令(Command),每次解析完消息后,就向核心协程发送命令,这个所谓的核心协程是单线程处理数据的,同时也发送一个回调通道,在tokio-mini-redis里用的是tokio官方的tokio::sync::oneshot
一次性通道的发送端,等待核心进行有序的返回结果。
对于一个简单mini-redis来说,不考虑其他权限功能、消息解析、淘汰策略,单单只是数据的crud,每秒可以承受远超过百万次的操作,作为内存级别的缓存是完全够用的。
本文意在抽象为一个并发的数据结构,所以用泛型简单封装了一下,其中为了避免繁杂的生命周期标记,泛型里去掉了引用而采用克隆的形式进行数据传递,所以键值都受到克隆Clone
、跨线程传输Sync
、静态生命周期'static
的约束,另外核心部分是哈希表,所以键必然会受到哈希和比较Hash + Eq
的约束。
当然,tokio-mini-redis直接使用了bytes::Bytes
作为存储类型,也不需要考虑复杂的泛型约束以及生命周期标注的问题。
方法实现上,主要实现了insert
、get
、remove
、len
等基础方法,同时也实现了元素快照items
的方法,基本上可以说,只要实现了以上五种方法,就可以实现几乎所有标准库里的其他方法了。
代码如下:
// 命令枚举,理论上每个命令都会对应一个结果
// 和标准库hashmap不同的是,所有命令都避开了引用形式
enum Command<K, V> {
Insert { key: K, value: V },
InsertResult { result: Option<V> },
Get { key: K },
GetResult { result: Option<V> },
Remove { key: K },
RemoveResult { result: Option<V> },
Items,
ItemsResult { result: Vec<(K, V)> },
Len,
LenResult { len: usize },
}
// 引入tokio的无限容量异步通道和一次性通道
use tokio::{
self,
sync::{
mpsc::{
unbounded_channel,
UnboundedSender
},
oneshot::{
channel,
Sender
},
}
};
// 核心数据结构,以通道发送端的形式来存储基础数据结构,整体需要实现Clone特性
#[derive(Clone)]
struct ChanHashMap<K, V> {
sender: UnboundedSender<(Command<K, V>, Sender<Command<K, V>>)>
}
// 哈希trait是需要单独引入的,其他trait已经自动引入可以直接使用
use std::collections::HashMap;
use std::hash::Hash;
// 为核心数据结构撰写方法,为泛型键值提供必要的约束
impl<K, V> ChanHashMap<K, V>
where K: Hash + Eq + Clone + Send + 'static,
V: Clone + Send + 'static
{
fn new() -> Self {
// 初始化单线程的哈希表作为核心存储数据结构
let mut database = HashMap::new();
// 初始化公共的输入输出异步通道,让其他命令可以通过此通道传输到主协程
let (in_sender, mut in_receiver) = unbounded_channel::<(Command<K, V>, Sender<Command<K, V>>)>();
// 启动一个协程(单线程)处理命令
tokio::spawn(async move {
// 其他协程通过传输命令的形式传进while循环中进行单线程处理
while let Some((cmd, out_sender)) = in_receiver.recv().await {
match cmd {
// 进行命令枚举匹配,并通过out_sender发送回原本的协程
Command::Insert { key, value } => {
let result = database.insert(key, value);
let _ = out_sender.send(Command::InsertResult{ result });
}
Command::Get { key } => {
let result = match database.get(&key) {
// std的hashmap.get获得是值引用,在并发情景下改为克隆传输,能够适用于绝大部分基础数据类型
Some(value) => Some(value.clone()),
None => None,
};
let _ = out_sender.send(Command::GetResult{ result });
}
Command::Remove { key } => {
let result = database.remove(&key);
let _ = out_sender.send(Command::RemoveResult{ result });
}
Command::Items => {
// 通过快照的形式获取全部键值对,并存成数组返回
let result = database.iter().map(|(k, v)| (k.clone(), v.clone())).collect::<Vec<_>>();
let _ = out_sender.send(Command::ItemsResult{ result });
}
Command::Len => {
let _ = out_sender.send(Command::LenResult{ len: database.len() });
}
_ => todo!()
}
}
});
// 把初始化的通道发送端装入基础数据结构中,方便多线程状态下克隆拷贝
Self { sender: in_sender }
}
async fn insert(&mut self, key: K, value: V) -> Option<V> {
// 初始化一次性通道
let (sender, receiver) = channel();
// 将命令和一次性通道的发送端发送到处理数据的主协程之中
let _ = self.sender.send((Command::Insert { key, value }, sender));
// 一次性通道的接收端等待处理结果并输出结果
match receiver.await.unwrap() {
Command::InsertResult{ result } => result,
_ => unreachable!()
}
}
async fn get(&mut self, key: K) -> Option<V> {
let (sender, receiver) = channel();
let _ = self.sender.send((Command::Get { key }, sender));
match receiver.await.unwrap() {
Command::GetResult{ result } => result,
_ => unreachable!()
}
}
async fn remove(&mut self, key: K) -> Option<V> {
let (sender, receiver) = channel();
let _ = self.sender.send((Command::Remove { key }, sender));
match receiver.await.unwrap() {
Command::RemoveResult{ result } => result,
_ => unreachable!()
}
}
async fn items(&mut self) -> Vec<(K, V)> {
let (sender, receiver) = channel();
let _ = self.sender.send((Command::Items , sender));
match receiver.await.unwrap() {
Command::ItemsResult{ result } => result,
_ => unreachable!()
}
}
async fn len(&mut self) -> usize {
let (sender, receiver) = channel();
let _ = self.sender.send((Command::Len, sender));
match receiver.await.unwrap() {
Command::LenResult{ len } => len,
_ => unreachable!()
}
}
}
单机百万次操作大约在370ms-500ms这样,如果将异步通道换成crossbeam
推荐的async-channel
性能大约还能优化20-30%,不过一次性通道却没有更好的替代品,第三方异步库的性能基本都不如tokio自带的oneshot
。
放在redis这个情境中,加上tcp-io以及RESP协议消息解析的性能损耗,最终每秒操作次数大约在10^5数量级也是比较符合认知的。
评论区
写评论还没有评论