< 返回版块

somewheve 发表于 2020-04-02 18:03

问题描述

我尝试使用actix-web提供的ChatServer来实现从rabbitmq中订阅数据,然后实现一个分发到websocket客户端的效果 然后我为此编写了一个函数。 叫做start_mq

// I rename the structure name 
impl RealtimeMq {
    fn send_message(&self, room: &str, message: &str, skip_id: usize) {
        if let Some(sessions) = self.rooms.get(room) {
            for id in sessions {
                if *id != skip_id {
                    if let Some(addr) = self.sessions.get(id) {
                        let _ = addr.do_send(Message(message.to_owned()));
                    }
                }
            }
        }
    }
    fn start_mq(&self) {
        let mq_config = QAEventMQ {
            amqp: CONFIG.AMQP.clone(),
            exchange: CONFIG.EXCHANGE.clone(),
            model: CONFIG.MODEL.clone(),
            routing_key: CONFIG.ROUTING_KEY.clone(),
        };
        let sender = self.sender.clone();
        let receiver = self.receiver.clone();
        thread::spawn(move || {
            QAEventMQ::consume(mq_config, sender)
        });
        thread::spawn(move || {
            loop {
                match receiver.recv() {
                    Ok(msg) => {
                        let data: Value = serde_json::from_str(&msg).unwrap();
                        let code = data["code"].as_str().unwrap();
                        println!("{:?}", data);
                        self.send_message(code, msg.as_str(), 0)
                    }
                    Err(e) => {
                        println!("read error ")
                    }
                }
            }
        });
    }
}

然后我在 default() 里面执行 start_mq 但是崩掉了

错误信息

error[E0277]: `(dyn actix::address::channel::Sender<realtime::Message> + 'static)` cannot be shared between threads safely
   --> src\realtime.rs:126:9
    |
126 |         thread::spawn(move || {
    |         ^^^^^^^^^^^^^ `(dyn actix::address::channel::Sender<realtime::Message> + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn actix::address::channel::Sender<realtime::Message> + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn actix::address::channel::Sender<realtime::Message> + 'static)>`
    = note: required because it appears within the type `std::boxed::Box<(dyn actix::address::channel::Sender<realtime::Message> + 'static)>`
    = note: required because it appears within the type `actix::address::Recipient<realtime::Message>`
    = note: required because it appears within the type `(usize, actix::address::Recipient<realtime::Message>)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `hashbrown::raw::RawTable<(usize, actix::address::Recipient<realtime::Message>)>`
    = note: required because it appears within the type `hashbrown::map::HashMap<usize, actix::address::Recipient<realtime::Message>, std::collections::hash_map::RandomState>`
    = note: required because it appears within the type `std::collections::HashMap<usize, actix::address::Recipient<realtime::Message>>`
    = note: required because it appears within the type `realtime::RealtimeMq`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&realtime::RealtimeMq`
    = note: required because it appears within the type `[closure@src\realtime.rs:126:23: 140:10 receiver:crossbeam_channel::channel::Receiver<std::string::String>, self:&realtime::RealtimeMq]`

error: aborting due to previous error

这个是我拿来改的项目demo addr

我该咋做 才能修复它 ?

评论区

写评论
Earthson 2020-05-11 11:16

你的consume函数是什么形式的?

thread::spawn应该不要求实现Sync,Sender是实现了Send的,被clone之后可以被move到其它线程,这里应该没啥问题。我怀疑是consume函数的定义可能有点问题,导致你依赖了一个Sender的引用(这导致需要Sender实现Sync)。

Ryan-Git 2020-04-06 23:24

你需要给 realtime::Message 实现 Send

作者 somewheve 2020-04-03 11:07

大佬能给个联系方式私聊吗 , 这个沟通排版我看不懂 对以下内容的回复:


还有一种错误可能, 你需要确认你的actix::address::channel 是什么类型的, 比如spsc/mpsc/mpmc等。 channel的模式类型直接决定了你能否将channel::Sender在多线程中传递共享。

yujinliang 2020-04-03 09:00

对以下内容的回复:


还有一种错误可能, 你需要确认你的actix::address::channel 是什么类型的, 比如spsc/mpsc/mpmc等。 channel的模式类型直接决定了你能否将channel::Sender在多线程中传递共享。

yujinliang 2020-04-03 08:59

对以下内容的回复:


还有一种错误可能, 你需要确认你的actix::address::channel 是什么类型的, 比如spsc/mpsc/mpmc等。 channel的模式类型直接决定了你能否将channel::Sender在多线程中传递。

yujinliang 2020-04-03 08:55

关键错误信息: = help: the trait std::marker::Sync is not implemented for (dyn actix::address::channel::Sender<realtime::Message> + 'static)

我猜realtime::Message是自定义类型吧, 你需要implement Trait Sync for your own class.

在Rust中只有implement 了Trait Sync的类型才可以在多线程中传递!当然你的自定义类型只有满足Trait Sync的要求才可以implement Trait Sync, 详情请看: https://doc.rust-lang.org/std/marker/trait.Sync.html

典型的错误: 自定义类型中、某个指针/引用 指向某些当前线程独有的东西,比如线程栈; 亦或是虽然指向堆,但是可能出现dangle pointer之类的错误! 最好是自定义类型中没有raw pointer/reference, 完全值传递, 当然这只是理想期许!

Arc/Mutex/Atomic*等可以保护堆指针跨线程引用。

作者 somewheve 2020-04-02 21:13

有大佬能回我吗

作者 somewheve 2020-04-02 18:03

那个是官方demo, 我拿来稍微改了下

1 共 8 条评论, 1 页