我在订阅到mqtt消息后,把消息交给actix actor处理,处理完后,又通知另外一个actor回复一条mqtt消息。 只要publish消息,就会出现下面的错误,导致后来再来的消息就无法处理回复了。
错误信息:
[2022-04-28T01:03:06Z INFO mqtt_test::client::mqtt_client] [mqtt]Successfully connected
[2022-04-28T01:03:06Z INFO mqtt_test::client::mqtt_client] [subscribe]topic: /test/488ad2965e86/request, qos: 0
[2022-04-28T01:03:08Z INFO mqtt_test::client::mqtt_client] [recv]topic: /test/488ad2965e86/request, qos: 0, payload: {"hello":"world"}
[2022-04-28T01:03:08Z INFO mqtt_test::client::mqtt_client] [pub]topic: /test/488ad2965e86/reply, qos: 0, payload: {"code": 200}
thread 'main' panicked at 'cannot execute `LocalPool` executor from within another executor: EnterError', C:\Users\win10\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\futures-executor-0.3.21\src\local_pool.rs:81:26
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
error: process didn't exit successfully: `target\debug\mqtt-test.exe` (exit code: 0xc000013a, STATUS_CONTROL_C_EXIT)
代码: https://github.com/tjz101/mqtt-test.git
另外在请教一个问题:我开始用actor是想可以异步处理消息,后来发现这样做还是会阻塞下一条消息进来。 是不是要用thread?
请大佬指点一下,我把代码上传到github了。
拜谢!
MQTT信息附上:tcp://broker-cn.emqx.io:1883 使用的是emqx提供的免费测试服务器,客户端id,用户密码,随意写一个就可以了。
1
共 3 条评论, 1 页
评论区
写评论我把
RecvActor::parse_msg
单独提取出来放到一个struct
中,然后将这个struct
用Arc::new(Mutex::new())
包装起来。就可以在 thread 中使用了。--
👇
tjz101: 我按你说的进行代码修改。已经不在出现这个错误。非常感谢你。
再请教一个问题:当我处理mqtt消息,需要等待很长时间才能处理完,如果我用thread来实现,无法在线程中通过 self 来调用
RecvActor::parse_msg
。 如果代码是同步的,在前一条消息没处理完之前,不能处理下一条消息。使用 thread 错误如下:
thread代码片段:https://github.com/tjz101/mqtt-test/blob/master/src/actor/recv_actor.rs
26行模拟业务处理等待时间
48-51行通过thread来调用
RecvActor::parse_msg
--
👇
Grobycn: 这个错误主要是递归调用了
block_on
。 看了你的代码,RecvActor::parse_msg
和MqttClient::publish
两个方法没有异步代码,完全可以把async
去掉, 然后去掉block_on
。我按你说的进行代码修改。已经不在出现这个错误。非常感谢你。
再请教一个问题:当我处理mqtt消息,需要等待很长时间才能处理完,如果我用thread来实现,无法在线程中通过 self 来调用
RecvActor::parse_msg
。 如果代码是同步的,在前一条消息没处理完之前,不能处理下一条消息。使用 thread 错误如下:
thread代码片段:https://github.com/tjz101/mqtt-test/blob/master/src/actor/recv_actor.rs
26行模拟业务处理等待时间
48-51行通过thread来调用
RecvActor::parse_msg
--
👇
Grobycn: 这个错误主要是递归调用了
block_on
。 看了你的代码,RecvActor::parse_msg
和MqttClient::publish
两个方法没有异步代码,完全可以把async
去掉, 然后去掉block_on
。这个错误主要是递归调用了
block_on
。 看了你的代码,RecvActor::parse_msg
和MqttClient::publish
两个方法没有异步代码,完全可以把async
去掉, 然后去掉block_on
。