问题描述
我尝试使用actix-web提供的ChatServer来实现从rabbitmq中订阅数据,然后实现一个分发到websocket客户端的效果
然后我为此编写了一个函数。 叫做start_mq
// I rename the structure name
impl RealtimeMq {
fn send_message(&self, room: &str, message: &str, skip_id: usize) {
if let Some(sessions) = self.rooms.get(room) {
for id in sessions {
if *id != skip_id {
if let Some(addr) = self.sessions.get(id) {
let _ = addr.do_send(Message(message.to_owned()));
}
}
}
}
}
fn start_mq(&self) {
let mq_config = QAEventMQ {
amqp: CONFIG.AMQP.clone(),
exchange: CONFIG.EXCHANGE.clone(),
model: CONFIG.MODEL.clone(),
routing_key: CONFIG.ROUTING_KEY.clone(),
};
let sender = self.sender.clone();
let receiver = self.receiver.clone();
thread::spawn(move || {
QAEventMQ::consume(mq_config, sender)
});
thread::spawn(move || {
loop {
match receiver.recv() {
Ok(msg) => {
let data: Value = serde_json::from_str(&msg).unwrap();
let code = data["code"].as_str().unwrap();
println!("{:?}", data);
self.send_message(code, msg.as_str(), 0)
}
Err(e) => {
println!("read error ")
}
}
}
});
}
}
然后我在 default()
里面执行 start_mq
但是崩掉了
错误信息
error[E0277]: `(dyn actix::address::channel::Sender<realtime::Message> + 'static)` cannot be shared between threads safely
--> src\realtime.rs:126:9
|
126 | thread::spawn(move || {
| ^^^^^^^^^^^^^ `(dyn actix::address::channel::Sender<realtime::Message> + 'static)` cannot be shared between threads safely
|
= help: the trait `std::marker::Sync` is not implemented for `(dyn actix::address::channel::Sender<realtime::Message> + 'static)`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn actix::address::channel::Sender<realtime::Message> + 'static)>`
= note: required because it appears within the type `std::boxed::Box<(dyn actix::address::channel::Sender<realtime::Message> + 'static)>`
= note: required because it appears within the type `actix::address::Recipient<realtime::Message>`
= note: required because it appears within the type `(usize, actix::address::Recipient<realtime::Message>)`
= note: required because of the requirements on the impl of `std::marker::Sync` for `hashbrown::raw::RawTable<(usize, actix::address::Recipient<realtime::Message>)>`
= note: required because it appears within the type `hashbrown::map::HashMap<usize, actix::address::Recipient<realtime::Message>, std::collections::hash_map::RandomState>`
= note: required because it appears within the type `std::collections::HashMap<usize, actix::address::Recipient<realtime::Message>>`
= note: required because it appears within the type `realtime::RealtimeMq`
= note: required because of the requirements on the impl of `std::marker::Send` for `&realtime::RealtimeMq`
= note: required because it appears within the type `[closure@src\realtime.rs:126:23: 140:10 receiver:crossbeam_channel::channel::Receiver<std::string::String>, self:&realtime::RealtimeMq]`
error: aborting due to previous error
这个是我拿来改的项目demo addr
我该咋做 才能修复它 ?
1
共 8 条评论, 1 页
评论区
写评论你的consume函数是什么形式的?
thread::spawn应该不要求实现Sync,Sender是实现了Send的,被clone之后可以被move到其它线程,这里应该没啥问题。我怀疑是consume函数的定义可能有点问题,导致你依赖了一个Sender的引用(这导致需要Sender实现Sync)。
你需要给 realtime::Message 实现 Send
大佬能给个联系方式私聊吗 , 这个沟通排版我看不懂 对以下内容的回复:
还有一种错误可能, 你需要确认你的actix::address::channel 是什么类型的, 比如spsc/mpsc/mpmc等。 channel的模式类型直接决定了你能否将channel::Sender在多线程中传递共享。
对以下内容的回复:
还有一种错误可能, 你需要确认你的actix::address::channel 是什么类型的, 比如spsc/mpsc/mpmc等。 channel的模式类型直接决定了你能否将channel::Sender在多线程中传递共享。
对以下内容的回复:
还有一种错误可能, 你需要确认你的actix::address::channel 是什么类型的, 比如spsc/mpsc/mpmc等。 channel的模式类型直接决定了你能否将channel::Sender在多线程中传递。
关键错误信息: = help: the trait
std::marker::Sync
is not implemented for(dyn actix::address::channel::Sender<realtime::Message> + 'static)
我猜realtime::Message是自定义类型吧, 你需要implement Trait Sync for your own class.
在Rust中只有implement 了Trait Sync的类型才可以在多线程中传递!当然你的自定义类型只有满足Trait Sync的要求才可以implement Trait Sync, 详情请看: https://doc.rust-lang.org/std/marker/trait.Sync.html
典型的错误: 自定义类型中、某个指针/引用 指向某些当前线程独有的东西,比如线程栈; 亦或是虽然指向堆,但是可能出现dangle pointer之类的错误! 最好是自定义类型中没有raw pointer/reference, 完全值传递, 当然这只是理想期许!
Arc/Mutex/Atomic*等可以保护堆指针跨线程引用。
有大佬能回我吗
那个是官方demo, 我拿来稍微改了下