< 返回我的博客

lithbitren 发表于 2023-02-28 01:38

Tags:tokio,channel,redis

把tokio-mini-redis键值数据处理这部分抽象出来成了一个并发的哈希表,可以稍微加深一下redis的理解。

思路上就是把每一个get/insert之类的操作抽象成枚举命令(Command),每次解析完消息后,就向核心协程发送命令,这个所谓的核心协程是单线程处理数据的,同时也发送一个回调通道,在tokio-mini-redis里用的是tokio官方的tokio::sync::oneshot一次性通道的发送端,等待核心进行有序的返回结果。

对于一个简单mini-redis来说,不考虑其他权限功能、消息解析、淘汰策略,单单只是数据的crud,每秒可以承受远超过百万次的操作,作为内存级别的缓存是完全够用的。

本文意在抽象为一个并发的数据结构,所以用泛型简单封装了一下,其中为了避免繁杂的生命周期标记,泛型里去掉了引用而采用克隆的形式进行数据传递,所以键值都受到克隆Clone、跨线程传输Sync、静态生命周期'static的约束,另外核心部分是哈希表,所以键必然会受到哈希和比较Hash + Eq的约束。

当然,tokio-mini-redis直接使用了bytes::Bytes作为存储类型,也不需要考虑复杂的泛型约束以及生命周期标注的问题。

方法实现上,主要实现了insertgetremovelen等基础方法,同时也实现了元素快照items的方法,基本上可以说,只要实现了以上五种方法,就可以实现几乎所有标准库里的其他方法了。

代码如下:

// 命令枚举,理论上每个命令都会对应一个结果
// 和标准库hashmap不同的是,所有命令都避开了引用形式
enum Command<K, V> {
    Insert { key: K, value: V },
    InsertResult { result: Option<V> },
    Get { key: K },
    GetResult { result: Option<V> },
    Remove { key: K },
    RemoveResult { result: Option<V> },
    Items,
    ItemsResult { result: Vec<(K, V)> },
    Len,
    LenResult { len: usize },
}

// 引入tokio的无限容量异步通道和一次性通道
use tokio::{
    self,
    sync::{
        mpsc::{
            unbounded_channel,
            UnboundedSender
        },
        oneshot::{
            channel,
            Sender
        },
    }
};

// 核心数据结构,以通道发送端的形式来存储基础数据结构,整体需要实现Clone特性
#[derive(Clone)]
struct ChanHashMap<K, V> {
    sender: UnboundedSender<(Command<K, V>, Sender<Command<K, V>>)>
}

// 哈希trait是需要单独引入的,其他trait已经自动引入可以直接使用
use std::collections::HashMap;
use std::hash::Hash;

// 为核心数据结构撰写方法,为泛型键值提供必要的约束
impl<K, V> ChanHashMap<K, V> 
where K: Hash + Eq + Clone + Send + 'static,
    V: Clone + Send + 'static 
{
    fn new() -> Self {
        // 初始化单线程的哈希表作为核心存储数据结构
        let mut database = HashMap::new();
        // 初始化公共的输入输出异步通道,让其他命令可以通过此通道传输到主协程
        let (in_sender, mut in_receiver) = unbounded_channel::<(Command<K, V>, Sender<Command<K, V>>)>();

        // 启动一个协程(单线程)处理命令
        tokio::spawn(async move {
            // 其他协程通过传输命令的形式传进while循环中进行单线程处理
            while let Some((cmd, out_sender)) = in_receiver.recv().await {
                match cmd {
                    // 进行命令枚举匹配,并通过out_sender发送回原本的协程
                    Command::Insert { key, value } => {
                        let result = database.insert(key, value);
                        let _ = out_sender.send(Command::InsertResult{ result });
                    }
                    Command::Get { key } => {
                        let result = match database.get(&key) {
                            // std的hashmap.get获得是值引用,在并发情景下改为克隆传输,能够适用于绝大部分基础数据类型
                            Some(value) => Some(value.clone()),
                            None => None,
                        };
                        let _ = out_sender.send(Command::GetResult{ result });
                    }
                    Command::Remove { key } => {
                        let result = database.remove(&key);
                        let _ = out_sender.send(Command::RemoveResult{ result });
                    }
                    Command::Items => {
                        // 通过快照的形式获取全部键值对,并存成数组返回
                        let result = database.iter().map(|(k, v)| (k.clone(), v.clone())).collect::<Vec<_>>();
                        let _ = out_sender.send(Command::ItemsResult{ result });
                    }
                    Command::Len => {
                        let _ = out_sender.send(Command::LenResult{ len: database.len() });
                    }
                    _ => todo!()
                }
            }
        });

        // 把初始化的通道发送端装入基础数据结构中,方便多线程状态下克隆拷贝
        Self { sender: in_sender }
    }

    async fn insert(&mut self, key: K, value: V) -> Option<V> {
        // 初始化一次性通道
        let (sender, receiver) = channel();
        // 将命令和一次性通道的发送端发送到处理数据的主协程之中
        let _ = self.sender.send((Command::Insert { key, value }, sender));
        // 一次性通道的接收端等待处理结果并输出结果
        match receiver.await.unwrap() {
            Command::InsertResult{ result } => result,
            _ => unreachable!()
        }
    }

    async fn get(&mut self, key: K) -> Option<V> {
        let (sender, receiver) = channel();
        let _ = self.sender.send((Command::Get { key }, sender));
        match receiver.await.unwrap() {
            Command::GetResult{ result } => result,
            _ => unreachable!()
        }
    }

    async fn remove(&mut self, key: K) -> Option<V> {
        let (sender, receiver) = channel();
        let _ = self.sender.send((Command::Remove { key }, sender));
        match receiver.await.unwrap() {
            Command::RemoveResult{ result } => result,
            _ => unreachable!()
        }
    }

    async fn items(&mut self) -> Vec<(K, V)> {
        let (sender, receiver) = channel();
        let _ = self.sender.send((Command::Items , sender));
        match receiver.await.unwrap() {
            Command::ItemsResult{ result } => result,
            _ => unreachable!()
        }
    }

    async fn len(&mut self) -> usize {
        let (sender, receiver) = channel();
        let _ = self.sender.send((Command::Len, sender));
        match receiver.await.unwrap() {
            Command::LenResult{ len } => len,
            _ => unreachable!()
        }
    }
}

单机百万次操作大约在370ms-500ms这样,如果将异步通道换成crossbeam推荐的async-channel性能大约还能优化20-30%,不过一次性通道却没有更好的替代品,第三方异步库的性能基本都不如tokio自带的oneshot

放在redis这个情境中,加上tcp-io以及RESP协议消息解析的性能损耗,最终每秒操作次数大约在10^5数量级也是比较符合认知的。

评论区

写评论

还没有评论

1 共 0 条评论, 1 页