代码如下: 线程里面有个无限循环,等待中断指令 没有指令每隔3秒起个协程运行程序
tokio::spawn(async move {
loop {
//退出
let rx = wkc::WKC_CASE.0.subscribe();
let sig = &*rx.borrow();
info!("获取关闭消息 {:?} {:?}", sig, tcid);
if sig.to_string() == tcid.clone() {
info!("关闭发送 {:?}", sig);
break;
}
//发送消息
select! {
recv(ticker) -> ms => {
info!("开始发送进程 {:?} {:?}", pr.device, ms);
let data = msg_batch(pr.clone(), prc.clone()).unwrap();
let mm = dev_list.clone();
let to = topic.clone();
tokio::spawn(async move {
for i in mm {
info!("开始tokio发送进程 {:?}", i.device);
mqttjob::mqtt_pub(&i.cli.clone(), to.clone(), data.clone()).unwrap();
}
});
},
}
}
});
上面如果里面的spawn,不加await,需要在loop中断的时候,才会一起触发执行
加了await,会报错
future cannot be sent between threads safely
within `impl futures_util::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `std::sync::RwLockReadGuard<'_, std::string::String>`rustc
protocol.rs(242, 23): future is not `Send` as this value is used across an await
protocol.rs(224, 24): consider moving this into a `let` binding to create a shorter lived borrow
spawn.rs(127, 21): required by a bound in `tokio::spawn`
future cannot be sent between threads safely
within `impl futures_util::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut ()`rustc
protocol.rs(242, 23): future is not `Send` as this value is used across an await
protocol.rs(224, 24): consider moving this into a `let` binding to create a shorter lived borrow
spawn.rs(127, 21): required by a bound in `tokio::spawn`
请问如何处理?
1
共 3 条评论, 1 页
评论区
写评论在群里说的检查一下runtime初始化的线程数,除了单线程应该不会loop独占
https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=60f7d352bbaf2c75931966deda45eeeb
多谢大神。。。 用的是crossbeam_channel::select;
接收退出信号,我用了个全局的broadcast信道。
已解决。加了一对大括号就解决了,真是神奇啊。
虽然跑过了,但是表示还没搞懂。
看描述是个很简单的功能,但是代码没看懂,错误也没看懂。
话说为什么select!里只有一个
recv(ticker) -> ms
啊。这种功能难道不应该是写两个异步函数,一个是async fn spawn_task
,一个是async fn stop_sig
。然后第一个负责每三秒spawn一个异步任务,第二个负责接收信号(也可以直接写成future),接收到信号后退出函数。然后用select!
。类似这个官方案例么: