< 返回版块

coco 发表于 2022-04-28 09:10

Tags:mqtt,actor,actix

我在订阅到mqtt消息后,把消息交给actix actor处理,处理完后,又通知另外一个actor回复一条mqtt消息。 只要publish消息,就会出现下面的错误,导致后来再来的消息就无法处理回复了。

错误信息:

[2022-04-28T01:03:06Z INFO  mqtt_test::client::mqtt_client] [mqtt]Successfully connected
[2022-04-28T01:03:06Z INFO  mqtt_test::client::mqtt_client] [subscribe]topic: /test/488ad2965e86/request, qos: 0
[2022-04-28T01:03:08Z INFO  mqtt_test::client::mqtt_client] [recv]topic: /test/488ad2965e86/request, qos: 0, payload: {"hello":"world"}
[2022-04-28T01:03:08Z INFO  mqtt_test::client::mqtt_client] [pub]topic: /test/488ad2965e86/reply, qos: 0, payload: {"code": 200}
thread 'main' panicked at 'cannot execute `LocalPool` executor from within another executor: EnterError', C:\Users\win10\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\futures-executor-0.3.21\src\local_pool.rs:81:26
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
error: process didn't exit successfully: `target\debug\mqtt-test.exe` (exit code: 0xc000013a, STATUS_CONTROL_C_EXIT)

代码: https://github.com/tjz101/mqtt-test.git

另外在请教一个问题:我开始用actor是想可以异步处理消息,后来发现这样做还是会阻塞下一条消息进来。 是不是要用thread?

请大佬指点一下,我把代码上传到github了。

拜谢!

MQTT信息附上:tcp://broker-cn.emqx.io:1883 使用的是emqx提供的免费测试服务器,客户端id,用户密码,随意写一个就可以了。

评论区

写评论
作者 coco 2022-04-28 14:03

我把 RecvActor::parse_msg 单独提取出来放到一个 struct 中,然后将这个 structArc::new(Mutex::new()) 包装起来。就可以在 thread 中使用了。

--
👇
tjz101: 我按你说的进行代码修改。已经不在出现这个错误。非常感谢你。

再请教一个问题:当我处理mqtt消息,需要等待很长时间才能处理完,如果我用thread来实现,无法在线程中通过 self 来调用 RecvActor::parse_msg。 如果代码是同步的,在前一条消息没处理完之前,不能处理下一条消息。

使用 thread 错误如下:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src\actor\recv_actor.rs:48:23
    |
43  |       fn handle(&mut self, msg: MqttMsg, ctx: &mut Self::Context) -> Self::Result {
    |                 --------- this data with an anonymous lifetime `'_`...
...
48  |           thread::spawn(move ||{
    |  _______________________^
49  | |             // info!("{}", msg.1);
50  | |             self.parse_msg(msg.1);
51  | |         });
    | |_________^ ...is used here...
    |
note: ...and is required to live as long as `'static` here
   --> src\actor\recv_actor.rs:48:9
    |
48  |         thread::spawn(move ||{
    |         ^^^^^^^^^^^^^
note: `'static` lifetime requirement introduced by this bound
   --> C:\Users\win10\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:646:15
    |
646 |     F: Send + 'static,

    |               ^^^^^^^

thread代码片段:https://github.com/tjz101/mqtt-test/blob/master/src/actor/recv_actor.rs

26行模拟业务处理等待时间

48-51行通过thread来调用 RecvActor::parse_msg

--
👇
Grobycn: 这个错误主要是递归调用了 block_on。 看了你的代码,RecvActor::parse_msgMqttClient::publish 两个方法没有异步代码,完全可以把 async 去掉, 然后去掉 block_on

作者 coco 2022-04-28 11:12

我按你说的进行代码修改。已经不在出现这个错误。非常感谢你。

再请教一个问题:当我处理mqtt消息,需要等待很长时间才能处理完,如果我用thread来实现,无法在线程中通过 self 来调用 RecvActor::parse_msg。 如果代码是同步的,在前一条消息没处理完之前,不能处理下一条消息。

使用 thread 错误如下:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src\actor\recv_actor.rs:48:23
    |
43  |       fn handle(&mut self, msg: MqttMsg, ctx: &mut Self::Context) -> Self::Result {
    |                 --------- this data with an anonymous lifetime `'_`...
...
48  |           thread::spawn(move ||{
    |  _______________________^
49  | |             // info!("{}", msg.1);
50  | |             self.parse_msg(msg.1);
51  | |         });
    | |_________^ ...is used here...
    |
note: ...and is required to live as long as `'static` here
   --> src\actor\recv_actor.rs:48:9
    |
48  |         thread::spawn(move ||{
    |         ^^^^^^^^^^^^^
note: `'static` lifetime requirement introduced by this bound
   --> C:\Users\win10\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:646:15
    |
646 |     F: Send + 'static,

    |               ^^^^^^^

thread代码片段:https://github.com/tjz101/mqtt-test/blob/master/src/actor/recv_actor.rs

26行模拟业务处理等待时间

48-51行通过thread来调用 RecvActor::parse_msg

--
👇
Grobycn: 这个错误主要是递归调用了 block_on。 看了你的代码,RecvActor::parse_msgMqttClient::publish 两个方法没有异步代码,完全可以把 async 去掉, 然后去掉 block_on

Grobycn 2022-04-28 10:34

这个错误主要是递归调用了 block_on。 看了你的代码,RecvActor::parse_msgMqttClient::publish 两个方法没有异步代码,完全可以把 async 去掉, 然后去掉 block_on

1 共 3 条评论, 1 页