< 返回版块

rascalrr 发表于 2022-10-25 13:35

Tags:回调函数,异步

use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

#[tokio::main]
async fn main() {
    let mut foo = Foo::new();

    let cb = callback;
    foo.set_callback(cb);
    foo.init_in_self().await;

    //foo.init_with_callback(callback).await;

    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

fn callback(data: i32) {
    println!("callback {}", data);
}

pub struct Foo {
    cb: Box<dyn Fn(i32) + Send + 'static>,    
}

async fn produce_loop(tx: &UnboundedSender<i32>) {
    let mut count = 0;
    loop {
        tx.send(count);
        count += 1;
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

async fn consumer_loop(rx: &mut UnboundedReceiver<i32>, cb: impl Fn(i32) + Send) {
    loop {
        let data = rx.recv().await.unwrap();
        println!("consume {}", data);
        cb(data);
    }
}

impl Foo {
    fn inner_cb(data: i32) {}
    pub fn new() -> Self {
        Foo {
            cb: Box::new(Self::inner_cb),            
        }
    }
    pub fn set_callback(&mut self, cb: impl Fn(i32) + Send + 'static) {
        self.cb = Box::new(cb);
    }

    pub async fn init_with_callback(&self, cb: impl Fn(i32) + Send + 'static) {
        let (tx, mut rx) = unbounded_channel::<i32>();
        tokio::task::spawn(async move { consumer_loop(&mut rx, cb).await });
        tokio::task::spawn(async move { produce_loop(&tx).await });
    }

    pub async fn init_in_self(&self) {
        let (tx, mut rx) = unbounded_channel::<i32>();
        let cb = Arc::new(self.cb);                
        tokio::task::spawn(async move { consumer_loop(&mut rx, *cb).await });
        tokio::task::spawn(async move { produce_loop(&tx).await });
    }
}

直接调用 init_with_callback,函数指针传进去是可以的。 但如果想先通过 set_callback设置回调函数,再在 init_in_self中调用的话,会报错

 ^^ has type `Arc<Box<dyn Fn(i32) + Send>>` which is not `Send`

该怎么做才是适合rust的写法呢?

评论区

写评论
作者 rascalrr 2022-12-01 17:16

@zkx 非常感谢。 梳理的思路非常清晰,容易理解。

zkx 2022-11-02 22:04

根据你的代码推测并梳理了下思路,可以发现对 cb 的要求如下:

  1. cb 是一个接收 i32 的函数指针,因此类型为:dyn Fn(i32)
  2. 你的函数需要 move 到异步任务中,但这样一来 struct 也就跟着销毁了,因此需要某种智能指针,能够实现 struct 和异步任务中都能指向 cb,多线程中有 Arc 这个选择,同时还能讲 trait obj 包起来:Arc<dyn Fn(i32)>
  3. Arc 在多线程中,要求内部 T 实现 Send 与 Sync,因此 cb 类型为 Arc<dyn Fn(i32) + Send + Sync>
  4. 根据需要设置回调函数来看,回调会涉及初始化与未初始化两种状态,而 Option 的 Some(...) 与 None 可以满足条件,且相对优雅,因此 cb 为 Option<Arc<dyn Fn(i32) + Send + Sync>>

综上,你的数据结构定义可以为:

type Cb = dyn Fn(i32) + Send + Sync;

pub struct Foo {
    cb: Option<Arc<Cb>>,
}

完整代码如下:

use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

#[tokio::main]
async fn main() {
    let mut foo = Foo::new();

    let cb = callback;
    foo.set_callback(Arc::new(cb));
    foo.init_in_self().await;

    //foo.init_with_callback(callback).await;

    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

fn callback(data: i32) {
    println!("callback {}", data);
}

type Cb = dyn Fn(i32) + Send + Sync;

pub struct Foo {
    cb: Option<Arc<Cb>>,
}

async fn produce_loop(tx: &UnboundedSender<i32>) {
    let mut count = 0;
    loop {
        tx.send(count).unwrap();
        count += 1;
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

async fn consumer_loop(rx: &mut UnboundedReceiver<i32>, cb: Arc<Cb>) {
    loop {
        let data = rx.recv().await.unwrap();
        println!("consume {}", data);
        cb(data);
    }
}

impl Foo {
    pub fn new() -> Self {
        Foo { cb: None }
    }

    pub fn set_callback(&mut self, cb: Arc<Cb>) {
        self.cb = Some(cb);
    }

    pub async fn init_with_callback(&self, cb: Arc<Cb>) {
        let (tx, mut rx) = unbounded_channel::<i32>();
        tokio::task::spawn(async move { consumer_loop(&mut rx, cb).await });
        tokio::task::spawn(async move { produce_loop(&tx).await });
    }

    pub async fn init_in_self(&self) {
        let (tx, mut rx) = unbounded_channel::<i32>();
        let cb = self.cb.as_ref().unwrap().clone();
        tokio::task::spawn(async move { consumer_loop(&mut rx, cb).await });
        tokio::task::spawn(async move { produce_loop(&tx).await });
    }
}
zylthinking 2022-10-27 18:06

去掉 Arc 就行了吧

作者 rascalrr 2022-10-26 15:33

感谢,用 std::mem::replace 是有效的,只是感觉上不那么自然。

--
👇
elsejj: ``` pub async fn init_in_self(&self) { let (tx, mut rx) = unbounded_channel::(); // 如果把 Box 理解为一个指针,rust 是不允许它指向的对象移走,因为移走了之后,当前的 Box 实际就不知道指向哪里了,也就是 self.cb,Arc 在这里没有意义,除非 cb 的定义不是 Box 而是 Arc,这样 consumer_loop 也就要接受一个 Arc 参数,而不是 impl Fn(i32) + Send let cb = Arc::new(self.cb);
tokio::task::spawn(async move { consumer_loop(&mut rx, *cb).await }); tokio::task::spawn(async move { produce_loop(&tx).await }); }


可以看看这样的改动

// 定义一个默认的 cb fn default_callback(data: i32) {

}

pub async fn init_in_self(&mut self) { let (tx, mut rx) = unbounded_channel::(); // 将 self.Box 重新设置,得到的 cb 就跟 self 没关系了 let cb = std::mem::replace(&mut self.cb, Box::new(default_callback)); tokio::task::spawn(async move { consumer_loop(&mut rx, cb).await }); tokio::task::spawn(async move { produce_loop(&tx).await }); }

elsejj 2022-10-26 13:27
    pub async fn init_in_self(&self) {
        let (tx, mut rx) = unbounded_channel::<i32>();
        // 如果把 Box 理解为一个指针,rust 是不允许它指向的对象移走,因为移走了之后,当前的 Box 实际就不知道指向哪里了,也就是 self.cb,Arc 在这里没有意义,除非 cb 的定义不是 Box 而是 Arc,这样 consumer_loop 也就要接受一个 Arc 参数,而不是 impl Fn(i32) + Send
        let cb = Arc::new(self.cb);                
        tokio::task::spawn(async move { consumer_loop(&mut rx, *cb).await });
        tokio::task::spawn(async move { produce_loop(&tx).await });
    }

可以看看这样的改动

// 定义一个默认的 cb
fn default_callback(data: i32) {
    
}

pub async fn init_in_self(&mut self) {
    let (tx, mut rx) = unbounded_channel::<i32>();
    // 将 self.Box 重新设置,得到的 cb 就跟 self 没关系了
    let cb = std::mem::replace(&mut self.cb, Box::new(default_callback));
    tokio::task::spawn(async move { consumer_loop(&mut rx, cb).await });
    tokio::task::spawn(async move { produce_loop(&tx).await });
}

作者 rascalrr 2022-10-25 17:29

谢谢答复~ init_in_self 这个函数就是在分成两个写的。没用Option是因为用inner_cb用于占位。 按理说 Arc<Box> 只要T是Send的情况下,Arc也是Send的,但我的例子却报了 is not Send 错误。

--
👇
Easonzero: 只是把set_cb/spawn分成两个函数的话, 感觉把Box套个Option就行了

如果希望spawn能够重复执行多次, 那么就把Box换成Arc

Easonzero 2022-10-25 14:16

只是把set_cb/spawn分成两个函数的话, 感觉把Box套个Option就行了

如果希望spawn能够重复执行多次, 那么就把Box换成Arc

1 共 7 条评论, 1 页