< 返回版块

Frioly 发表于 2022-12-01 11:35

Tags:tokio、异步、 mqtt

就是MQTT协议有一个订阅的功能,在客户端建立链接的时候会有一个tcpstream,在tokio中,这个tcpstream会一直执行next方法获取新的客户端数据,如果想主动推送消息给客户端的话是拿不到这个tcpstream的。

let listener = TcpListener::bind(&self.addr).await.unwrap();
        info!("starting server on {}",self.prot);
        loop {
            let (stream, addr) = listener.accept().await.unwrap();
            tokio::spawn(async move {
                let mut stream = MqttServerStream::new(stream);
                stream.start().await;
            });
        }

由于tokio::sync::Tcpstream没有实现Copy Trait。导致不能在loop之外再去缓存或使用Tcpstream

评论区

写评论
作者 Frioly 2022-12-02 08:54

Arc试过,用Arc<Mutex>也试过,由于stream需要一直读取数据,用Arc<Mutex>包装后代码不会报错,但是当把数据返回给客户端之后链接就会断开。目前还有一个思路是把stream对读写两端进行分离,然后两个线程去分别进行读和写操作,不过还没有验证。

👇
songzhi: 用Arc包起来,这里不能用朴素的Copy/Clone,因为你一复制之后就变成两个对象了,不符合逻辑,Arc只是个智能指针,再怎么Clone还是指向的同一个对象。

songzhi 2022-12-01 16:28

Arc包起来,这里不能用朴素的Copy/Clone,因为你一复制之后就变成两个对象了,不符合逻辑,Arc只是个智能指针,再怎么Clone还是指向的同一个对象。

1 共 2 条评论, 1 页