我在订阅到mqtt消息后,把消息交给actix actor处理,处理完后,又通知另外一个actor回复一条mqtt消息。 只要publish消息,就会出现下面的错误,导致后来再来的消息就无法处理回复了。
错误信息:
[2022-04-28T01:03:06Z INFO mqtt_test::client::mqtt_client] [mqtt]Successfully connected
[2022-04-28T01:03:06Z INFO mqtt_test::client::mqtt_client] [subscribe]topic: /test/488ad2965e86/request, qos: 0
[2022-04-28T01:03:08Z INFO mqtt_test::client::mqtt_client] [recv]topic: /test/488ad2965e86/request, qos: 0, payload: {"hello":"world"}
[2022-04-28T01:03:08Z INFO mqtt_test::client::mqtt_client] [pub]topic: /test/488ad2965e86/reply, qos: 0, payload: {"code": 200}
thread 'main' panicked at 'cannot execute `LocalPool` executor from within another executor: EnterError', C:\Users\win10\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\futures-executor-0.3.21\src\local_pool.rs:81:26
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
error: process didn't exit successfully: `target\debug\mqtt-test.exe` (exit code: 0xc000013a, STATUS_CONTROL_C_EXIT)
代码: https://github.com/tjz101/mqtt-test.git
另外在请教一个问题:我开始用actor是想可以异步处理消息,后来发现这样做还是会阻塞下一条消息进来。 是不是要用thread?
请大佬指点一下,我把代码上传到github了。
拜谢!
MQTT信息附上:tcp://broker-cn.emqx.io:1883 使用的是emqx提供的免费测试服务器,客户端id,用户密码,随意写一个就可以了。
1
共 3 条评论, 1 页
评论区
写评论我把
RecvActor::parse_msg
单独提取出来放到一个struct
中,然后将这个struct
用Arc::new(Mutex::new())
包装起来。就可以在 thread 中使用了。--
👇
tjz101: 我按你说的进行代码修改。已经不在出现这个错误。非常感谢你。
再请教一个问题:当我处理mqtt消息,需要等待很长时间才能处理完,如果我用thread来实现,无法在线程中通过 self 来调用
RecvActor::parse_msg
。 如果代码是同步的,在前一条消息没处理完之前,不能处理下一条消息。使用 thread 错误如下:
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement --> src\actor\recv_actor.rs:48:23 | 43 | fn handle(&mut self, msg: MqttMsg, ctx: &mut Self::Context) -> Self::Result { | --------- this data with an anonymous lifetime `'_`... ... 48 | thread::spawn(move ||{ | _______________________^ 49 | | // info!("{}", msg.1); 50 | | self.parse_msg(msg.1); 51 | | }); | |_________^ ...is used here... | note: ...and is required to live as long as `'static` here --> src\actor\recv_actor.rs:48:9 | 48 | thread::spawn(move ||{ | ^^^^^^^^^^^^^ note: `'static` lifetime requirement introduced by this bound --> C:\Users\win10\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:646:15 | 646 | F: Send + 'static, | ^^^^^^^
thread代码片段:https://github.com/tjz101/mqtt-test/blob/master/src/actor/recv_actor.rs
26行模拟业务处理等待时间
48-51行通过thread来调用
RecvActor::parse_msg
--
👇
Grobycn: 这个错误主要是递归调用了
block_on
。 看了你的代码,RecvActor::parse_msg
和MqttClient::publish
两个方法没有异步代码,完全可以把async
去掉, 然后去掉block_on
。我按你说的进行代码修改。已经不在出现这个错误。非常感谢你。
再请教一个问题:当我处理mqtt消息,需要等待很长时间才能处理完,如果我用thread来实现,无法在线程中通过 self 来调用
RecvActor::parse_msg
。 如果代码是同步的,在前一条消息没处理完之前,不能处理下一条消息。使用 thread 错误如下:
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement --> src\actor\recv_actor.rs:48:23 | 43 | fn handle(&mut self, msg: MqttMsg, ctx: &mut Self::Context) -> Self::Result { | --------- this data with an anonymous lifetime `'_`... ... 48 | thread::spawn(move ||{ | _______________________^ 49 | | // info!("{}", msg.1); 50 | | self.parse_msg(msg.1); 51 | | }); | |_________^ ...is used here... | note: ...and is required to live as long as `'static` here --> src\actor\recv_actor.rs:48:9 | 48 | thread::spawn(move ||{ | ^^^^^^^^^^^^^ note: `'static` lifetime requirement introduced by this bound --> C:\Users\win10\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:646:15 | 646 | F: Send + 'static, | ^^^^^^^
thread代码片段:https://github.com/tjz101/mqtt-test/blob/master/src/actor/recv_actor.rs
26行模拟业务处理等待时间
48-51行通过thread来调用
RecvActor::parse_msg
--
👇
Grobycn: 这个错误主要是递归调用了
block_on
。 看了你的代码,RecvActor::parse_msg
和MqttClient::publish
两个方法没有异步代码,完全可以把async
去掉, 然后去掉block_on
。这个错误主要是递归调用了
block_on
。 看了你的代码,RecvActor::parse_msg
和MqttClient::publish
两个方法没有异步代码,完全可以把async
去掉, 然后去掉block_on
。