< 返回版块

heliping 发表于 2022-11-23 18:00

代码如下: 线程里面有个无限循环,等待中断指令 没有指令每隔3秒起个协程运行程序

tokio::spawn(async move {
        loop {
            //退出
            let rx = wkc::WKC_CASE.0.subscribe();
            let sig = &*rx.borrow();
            info!("获取关闭消息 {:?} {:?}", sig, tcid);
            if sig.to_string() == tcid.clone() {
                info!("关闭发送 {:?}", sig);
                break;
            }
            //发送消息
            select! {
               recv(ticker) -> ms  => {
                    info!("开始发送进程 {:?} {:?}", pr.device, ms);
                    let data = msg_batch(pr.clone(), prc.clone()).unwrap();
                    let mm = dev_list.clone();
                    let to = topic.clone();
                    tokio::spawn(async move {
                        for i in mm {
                            info!("开始tokio发送进程 {:?}", i.device);
                            mqttjob::mqtt_pub(&i.cli.clone(), to.clone(), data.clone()).unwrap();
                        }
                    });
                },
            }
        }
    });

上面如果里面的spawn,不加await,需要在loop中断的时候,才会一起触发执行

加了await,会报错


future cannot be sent between threads safely
within `impl futures_util::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `std::sync::RwLockReadGuard<'_, std::string::String>`rustc
protocol.rs(242, 23): future is not `Send` as this value is used across an await
protocol.rs(224, 24): consider moving this into a `let` binding to create a shorter lived borrow
spawn.rs(127, 21): required by a bound in `tokio::spawn`
future cannot be sent between threads safely
within `impl futures_util::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut ()`rustc
protocol.rs(242, 23): future is not `Send` as this value is used across an await
protocol.rs(224, 24): consider moving this into a `let` binding to create a shorter lived borrow
spawn.rs(127, 21): required by a bound in `tokio::spawn`

请问如何处理?

评论区

写评论
dlhxzb 2022-11-24 16:20

在群里说的检查一下runtime初始化的线程数,除了单线程应该不会loop独占
https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=60f7d352bbaf2c75931966deda45eeeb

作者 heliping 2022-11-24 09:01

多谢大神。。。 用的是crossbeam_channel::select;

接收退出信号,我用了个全局的broadcast信道。

已解决。加了一对大括号就解决了,真是神奇啊。

虽然跑过了,但是表示还没搞懂。

 let ticker = tick(Duration::from_secs(3));
    tokio::spawn(async move {
        loop {
            {
                //退出
                let rx = wkc::WKC_CASE.0.subscribe();
                let sig = &*rx.borrow();
                info!("获取关闭消息 {:?} {:?}", sig, tcid);
                if sig.to_string() == tcid.clone() {
                    info!("关闭发送 {:?}", sig);
                    break;
                }
            }
            //发送消息
            select! {
               recv(ticker) -> ms  => {
                    info!("开始发送进程 {:?} {:?}", pr.device, ms);
                    let data = msg_batch(pr.clone(), prc.clone()).unwrap();
                    let mm = dev_list.clone();
                    let to = topic.clone();
                    tokio::spawn(async move {
                        for i in mm {
                            info!("开始tokio发送进程 {:?}", i.device);
                            mqttjob::mqtt_pub(&i.cli.clone(), to.clone(), data.clone()).unwrap();
                        }
                    }).await.unwrap();
                    //tokio::task::yield_now().await;
                },
            }
        }
    });

eweca-d 2022-11-23 22:33

看描述是个很简单的功能,但是代码没看懂,错误也没看懂。

话说为什么select!里只有一个recv(ticker) -> ms啊。这种功能难道不应该是写两个异步函数,一个是async fn spawn_task,一个是async fn stop_sig。然后第一个负责每三秒spawn一个异步任务,第二个负责接收信号(也可以直接写成future),接收到信号后退出函数。然后用select!。类似这个官方案例么:

use tokio::time::{self, Duration};

async fn some_async_work() {
    // do work
}

#[tokio::main]
async fn main() {
    let sleep = time::sleep(Duration::from_millis(50));
    tokio::pin!(sleep);

    loop {
        tokio::select! {
            _ = &mut sleep => {
                println!("operation timed out");
                break;
            }
            _ = some_async_work() => {
                println!("operation completed");
            }
        }
    }
1 共 3 条评论, 1 页