< 返回版块

heliping 发表于 2022-11-25 11:23

需求:

1.mqtt的消息发送,模拟一定数量的mqtt client发送消息,每个终端每3秒发送一条。期望每3秒,触发一个协程,遍历dev_list里面的client集合,发送消息,遍历完成后结束。

2.接收中断信号后,停止发送

如下代码实际产生的效果:

1.当终端数量少时,可以正常3s发送,模拟100个client时,发送的消息变为10s

2.终端多了后,中断的信号也没法接收到

请各位大神帮忙看看,这个tokio的协程该怎么实现?

 let mut ticker = time::interval(Duration::from_secs(3));
    tokio::spawn(async move {
        loop {
            let mut rx = wkc::WKC_CASE.0.subscribe();
            select! {
               ms = ticker.tick() => {
                    info!("开始发送进程 {:?} {:?}", pr.device, ms);
                    let data = msg_batch(pr.clone(), prc.clone()).unwrap();
                    let mm = dev_list.clone();
                    let to = topic.clone();
                    //发送消息
                    tokio::spawn(async move {
                        for i in mm {
                            info!("开始tokio发送进程 {:?}", i.device);
                            mqttjob::mqtt_pub(&i.cli.clone(), to.clone(), data.clone()).unwrap();
                        }
                    });
                },
                //退出
             _ = rx.changed() => {
                let sig = &*rx.borrow();
                info!("获取关闭消息 {:?} {:?}", sig, tcid);
                if sig.to_string() == tcid.clone() {
                    let mm = dev_list.clone();
                    info!("关闭发送 {:?}", sig);

                        for i in mm {
                            info!("关闭终端 {:?}", i.device);
                            i.cli.disconnect(None).unwrap();
                        }

                    break;
                }
             }

            }
        }
    });

日志显示接收的timer也是10s,但是看timer自己的记录确实是3s就触发了

2022-11-25 11:09:38 | INFO  | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755091, tv_nsec: 323415320 }
2022-11-25 11:09:48 | INFO  | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755094, tv_nsec: 323415320 }
2022-11-25 11:09:58 | INFO  | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755097, tv_nsec: 323415320 }
2022-11-25 11:10:09 | INFO  | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755100, tv_nsec: 323415320 }
2022-11-25 11:10:19 | INFO  | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755103, tv_nsec: 323415320 }
2022-11-25 11:10:29 | INFO  | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755106, tv_nsec: 323415320 }
2022-11-25 11:10:39 | INFO  | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755109, tv_nsec: 323415320 }
2022-11-25 11:10:49 | INFO  | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755112, tv_nsec: 323415320 }

评论区

写评论
作者 heliping 2022-11-26 09:37

多谢各位大神,原因是actix是单线程的runtime,不是程序的问题。

准备换web框架了。

wangbyby 2022-11-25 19:00

我不知道你的rx是什么类型, 所以写了个小demo,发送信号是mpsc

use tokio::sync::mpsc;
use tokio::time::{self, Duration};

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum Sig {
    Continue,
    Break,
}

#[tokio::main]
async fn main() {
    let mut ticker = time::interval(Duration::from_secs(1));

    let (tx, mut rx) = mpsc::channel(4);

    tokio::spawn(async move {
        let mut t = time::interval(Duration::from_secs(1));
        if let Err(_) = tx.send(Sig::Continue).await {
            println!("Send Err");
        }
        for _ in 0..10 {
            t.tick().await;
        }
        if let Err(_) = tx.send(Sig::Continue).await {
            println!("Send Err");
        }
        for _ in 0..2 {
            t.tick().await;
        }
        if let Err(_) = tx.send(Sig::Break).await {
            println!("Send Err");
        }
    });

    loop {
        tokio::select! {
            _ = ticker.tick() => {
                println!("tick");
            },
            msg = rx.recv() => {
                println!("message is:{:?}",msg);
                if msg == Some(Sig::Break){
                    return;
                }
            }
        }
    }
}

laohanlinux 2022-11-25 17:18

n年前的代码,可以参考一下https://github.com/laohanlinux/consensus-rs/blob/master/src/consensus/pbft/core/timer.rs

dlhxzb 2022-11-25 15:22

1.看文档你不用WebSocket的话是不需要System的,可以替换为tokio::main
2.System也可以不用actix_web::main创建,可以手动创建调with_tokio_rt自行绑定tokio runtime
3.自己在需要的地方创建tokio runtime,并block_on,这些spawn都放到block_on下层

作者 heliping 2022-11-25 15:15

求大神救救孩子吧!

--
👇
dlhxzb: 起码还有三种方法

dlhxzb 2022-11-25 15:13

起码还有三种方法

作者 heliping 2022-11-25 15:06

github上面也看到了。。。

so。。。没法继续啦?!

Since you are using atcix::main macro. It means you are running on a single thread runtime. calling tokio::task::spawn would not make your task run on other threads.

--
👇
dlhxzb: actix_web::main
-> system::new
-> runtime::default_tokio_runtime
看到这里就知道它是个new_current_thread单线程啦

dlhxzb 2022-11-25 15:04

actix_web::main
-> system::new
-> runtime::default_tokio_runtime
看到这里就知道它是个new_current_thread单线程啦

dlhxzb 2022-11-25 14:54

actix_web::rt是个single-threaded runtime,线程数的问题最早在群里好像就说了

作者 heliping 2022-11-25 14:50

一样的效果,10s

  tokio::task::spawn_blocking(move || {
                        for i in mm {
                            info!("开始tokio发送进程 {:?}", i.device);
                            mqttjob::mqtt_pub(&i.cli.clone(), to.clone(), data.clone()).unwrap();
                        }
                    }).await.unwrap();

--
👇
dlhxzb: 试试spawn_blocking吧

作者 heliping 2022-11-25 14:26

整体是一个actix的框架

stackoverflow上面说用了actix_web::main就不需要tokio::main了

#[actix_web::main]
pub async fn server() -> std::io::Result<()> {
    //初始化数据库
    let host =  CONFIG_DATA["server"]["host"].as_str().unwrap();
    let port =  CONFIG_DATA["server"]["port"].as_u64().unwrap();
    match db::db_init().await {
        Err(e) => info!("{e}"),
        Ok(()) => (),
    };

但是主main是非异步的

fn main() {
    //初始化日志
    logs::logger::logger_init();
    //启动APP
    router::router::server().unwrap();
}

--
👇
dlhxzb: tokio runtime怎么初始化的?看着像线程池只有一个线程,两个spawn互斥执行

dlhxzb 2022-11-25 14:09

tokio runtime怎么初始化的?看着像线程池只有一个线程,两个spawn互斥执行

1 共 12 条评论, 1 页