(1)有一个任务队列 tokio::sync::mpsc::channel,receiver用Arctokio::sync::Mutex包装
(2)SOCKET读写分离后,启动两个线程处理,写线程会从receiver获取任务
(3)读写线程用select! 包装在一起
(4)当SOCKET断开,读线程会先结束,写线程会被取消,但是写线程里面的Mutex Lock不会被释放
(5)当再次调用Mutex.lcok.await会堵塞
let (msg_sender, msg_receiver) = tokio::sync::mpsc::channel::<String>(10240);
let msg_receiver = Arc::new(tokio::sync::mutex::Mutex::new(msg_receiver)));
let read_task = tokio::spawn(async move {
read_from_socket(client_reader).await
});
let task_receiver = Arc::clone(&msg_receiver);
let write_task = tokio::spawn(async move {
write_to_socket(client_writer,task_receiver ).await
});
tokio::select! {
e = read_task =>{
error!("({}) error:{:?}", bus_addr, e);
},
e = write_task =>{
error!("({}) error:{:?}", bus_addr, e);
},
}
msg_receiver.lock().await;//deadlock
async fn write_to_socket(mut writer:OwnedWriteHalf,task_recviver:Arc<Mutex<mpsc::Receiver<String>) -> anyhow::Result<()> {
let task_receiver = Arc::clone(&socket_receiver);
let msg_receiver = socket_receiver.lock.await;
loop{
match msg_receiver.recv().await {
None => { return; }
Some(d) => { writer.send(d); }
};
}
}
1
共 6 条评论, 1 页
评论区
写评论感谢大佬回答~
--
👇
我心飞翔: 你的这个 select 等待其中任意一个协程结束,但另一个并为结束,可以 channel 通知结束,也可以不用 spawn 直接 select 那两个一步任务 read_from_socket 和 write_to_socket,这样select 一个结束,另一个 就会被取消掉。
谢谢大佬~理解了
--
👇
xiaoyaou:
这句话里
写线程会被取消是很有误导性的,因为spawn出来的新任务什么时候结束是它自己控制的,不受外部影响,也即:在对应的msg_sender被销毁后,write_to_socket里的msg_receiver.recv().await才会因为接收不到新值而退出循环,结束任务。这里要仔细的就是,虽然
select!会取消其他分支的future(直接drop不再poll),但是你这里的分支,只是tokio::spawn返回的桥接句柄JoinHandle,这里需要明白spawn的机制以及怎么跟spawn出去的新任务交互的,因为JoinHandle只是桥接当前任务和新任务(为了获取新任务执行结果)的连接工具,完全不能控制新任务的走向。所以片段里的
read_task和write_task两个future分支,只是在监听他们分别对应的那个独立Future任务的执行结果,取消这两个分支仅意味着放弃对独立任务的监听了,而独立任务依旧在正常运行。理解了上面这些,也就明白为什么你在
select!语句后面再加一个获取锁msg_receiver.lock().await后,任务就死锁了:因为当前的write_to_socket还在等待外面的msg_sender结束,然而外面却在等里面先释放锁才能销毁msg_sender(在作用域末尾自动drop),所以锁循环了。所以
msg_sender,等write_to_socket执行结束锁自动回收spawn创建独立的future任务,直接在当前的任务里select,前者相当于是select分支多线程(可能)并行轮询,后者则是仅单线程并发轮询,但是后者可以直接控制目标future是否取消/终止对了
JoinHandle也不是不能终止底层的独立future任务,只是要显式调用abort方法,默认drop销毁是让任务直接变成游离状态,独立的更彻底了这句话里
写线程会被取消是很有误导性的,因为spawn出来的新任务什么时候结束是它自己控制的,不受外部影响,也即:在对应的msg_sender被销毁后,write_to_socket里的msg_receiver.recv().await才会因为接收不到新值而退出循环,结束任务。这里要仔细的就是,虽然
select!会取消其他分支的future(直接drop不再poll),但是你这里的分支,只是tokio::spawn返回的桥接句柄JoinHandle,这里需要明白spawn的机制以及怎么跟spawn出去的新任务交互的,因为JoinHandle只是桥接当前任务和新任务(为了获取新任务执行结果)的连接工具,完全不能控制新任务的走向。所以片段里的
read_task和write_task两个future分支,只是在监听他们分别对应的那个独立Future任务的执行结果,取消这两个分支仅意味着放弃对独立任务的监听了,而独立任务依旧在正常运行。理解了上面这些,也就明白为什么你在
select!语句后面再加一个获取锁msg_receiver.lock().await后,任务就死锁了:因为当前的write_to_socket还在等待外面的msg_sender结束,然而外面却在等里面先释放锁才能销毁msg_sender(在作用域末尾自动drop),所以锁循环了。所以
msg_sender,等write_to_socket执行结束锁自动回收spawn创建独立的future任务,直接在当前的任务里select,前者相当于是select分支多线程(可能)并行轮询,后者则是仅单线程并发轮询,但是后者可以直接控制目标future是否取消/终止你的这个 select 等待其中任意一个协程结束,但另一个并为结束,可以 channel 通知结束,也可以不用 spawn 直接 select 那两个一步任务 read_from_socket 和 write_to_socket,这样select 一个结束,另一个 就会被取消掉。
求助,这种场景下怎么保证锁的释放。。