需求:
1.mqtt的消息发送,模拟一定数量的mqtt client发送消息,每个终端每3秒发送一条。期望每3秒,触发一个协程,遍历dev_list里面的client集合,发送消息,遍历完成后结束。
2.接收中断信号后,停止发送
如下代码实际产生的效果:
1.当终端数量少时,可以正常3s发送,模拟100个client时,发送的消息变为10s
2.终端多了后,中断的信号也没法接收到
请各位大神帮忙看看,这个tokio的协程该怎么实现?
let mut ticker = time::interval(Duration::from_secs(3));
tokio::spawn(async move {
loop {
let mut rx = wkc::WKC_CASE.0.subscribe();
select! {
ms = ticker.tick() => {
info!("开始发送进程 {:?} {:?}", pr.device, ms);
let data = msg_batch(pr.clone(), prc.clone()).unwrap();
let mm = dev_list.clone();
let to = topic.clone();
//发送消息
tokio::spawn(async move {
for i in mm {
info!("开始tokio发送进程 {:?}", i.device);
mqttjob::mqtt_pub(&i.cli.clone(), to.clone(), data.clone()).unwrap();
}
});
},
//退出
_ = rx.changed() => {
let sig = &*rx.borrow();
info!("获取关闭消息 {:?} {:?}", sig, tcid);
if sig.to_string() == tcid.clone() {
let mm = dev_list.clone();
info!("关闭发送 {:?}", sig);
for i in mm {
info!("关闭终端 {:?}", i.device);
i.cli.disconnect(None).unwrap();
}
break;
}
}
}
}
});
日志显示接收的timer也是10s,但是看timer自己的记录确实是3s就触发了
2022-11-25 11:09:38 | INFO | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755091, tv_nsec: 323415320 }
2022-11-25 11:09:48 | INFO | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755094, tv_nsec: 323415320 }
2022-11-25 11:09:58 | INFO | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755097, tv_nsec: 323415320 }
2022-11-25 11:10:09 | INFO | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755100, tv_nsec: 323415320 }
2022-11-25 11:10:19 | INFO | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755103, tv_nsec: 323415320 }
2022-11-25 11:10:29 | INFO | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755106, tv_nsec: 323415320 }
2022-11-25 11:10:39 | INFO | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755109, tv_nsec: 323415320 }
2022-11-25 11:10:49 | INFO | src/services/protocol.rs:224 — 开始发送进程 "2134213188" Instant { tv_sec: 14755112, tv_nsec: 323415320 }
1
共 12 条评论, 1 页
评论区
写评论多谢各位大神,原因是actix是单线程的runtime,不是程序的问题。
准备换web框架了。
我不知道你的rx是什么类型, 所以写了个小demo,发送信号是mpsc
n年前的代码,可以参考一下https://github.com/laohanlinux/consensus-rs/blob/master/src/consensus/pbft/core/timer.rs
1.看文档你不用WebSocket的话是不需要System的,可以替换为tokio::main
2.System也可以不用actix_web::main创建,可以手动创建调with_tokio_rt自行绑定tokio runtime
3.自己在需要的地方创建tokio runtime,并block_on,这些spawn都放到block_on下层
求大神救救孩子吧!
--
👇
dlhxzb: 起码还有三种方法
起码还有三种方法
github上面也看到了。。。
so。。。没法继续啦?!
--
👇
dlhxzb: actix_web::main
-> system::new
-> runtime::default_tokio_runtime
看到这里就知道它是个new_current_thread单线程啦
actix_web::main
-> system::new
-> runtime::default_tokio_runtime
看到这里就知道它是个new_current_thread单线程啦
actix_web::rt是个single-threaded runtime,线程数的问题最早在群里好像就说了
一样的效果,10s
--
👇
dlhxzb: 试试spawn_blocking吧
整体是一个actix的框架
stackoverflow上面说用了actix_web::main就不需要tokio::main了
但是主main是非异步的
--
👇
dlhxzb: tokio runtime怎么初始化的?看着像线程池只有一个线程,两个spawn互斥执行
tokio runtime怎么初始化的?看着像线程池只有一个线程,两个spawn互斥执行