< 返回版块

WorldLink 发表于 2021-01-20 10:43

Tags:tokio,异步,闭包,动态分发

我定义了一个TaskPool的struct 来对异步操作做调度

pub struct TaskPool<T>
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
{
    /// Pool
    pool: Option<Sender<T>>,
}

但是执行

tokio_test::block_on(async {
            let task_pool = TaskPool::new(10, async {
                println!("hello world 1314");
            }).await;

            task_pool.send_task(async {
                println!("hello world");
            }).await;

            tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
        })

报错

error[E0308]: mismatched types
   --> src/lib.rs:134:39
    |
130 |               let task_pool = TaskPool::new(10, async {
    |  _____________________________________________________-
131 | |                 println!("hello world 1314");
132 | |             }).await;
    | |_____________- the expected generator
133 | 
134 |               task_pool.send_task(async {
    |  _______________________________________^
135 | |                 println!("hello world");
136 | |             }).await;
    | |_____________^ expected generator, found a different generator
    |
    = note: expected generator `[static generator@src/lib.rs:130:53: 132:14 _]`
               found generator `[static generator@src/lib.rs:134:39: 136:14 _]`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0308`.
error: could not compile `tokio_utils`.

To learn more, run the command again with --verbose.

Process finished with exit code 101

这是因为

where
        T: Future + Send + 'static,
        T::Output: Send + 'static,

这个是静态分发的, 这个异步闭包 怎么动态分发?
这样???

    struct DynFn
    {
        funcs: Vec<Box<dyn std::future::Future<Output=()>>>
    }

    impl DynFn
    {
        fn new() -> DynFn {
            DynFn {
                funcs: Vec::new()
            }
        }
        pub async fn run(&self, f: Box<dyn std::future::Future<Output=()>>)
        {
            f.await;  //  the trait `std::marker::Unpin` is not implemented for `dyn std::future::Future<Output = ()>`
        }
    }

    #[test]
    fn test_dyn_func() {
        tokio_test::block_on(async {
            let d = DynFn::new();
            d.run(Box::new(async {
                println!("hello world");
            }));
        });
    }

一下是该函数的完整代码: pool.rs

use std::future::Future;
use tokio::sync::{mpsc, Mutex};
use tokio::task;
use tokio::sync::mpsc::{Sender, Receiver};
use crate::WaitGroup;
use std::sync::Arc;

/// Asynchronous task pools
pub struct TaskPool<T>
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
{
    /// Pool
    pool: Option<Sender<T>>,
}

impl<T> TaskPool<T>
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
{
    /// Initialisation Asynchronous task pools
    pub async fn new(size: usize, close_func: T) -> TaskPool<T> {
        // Control the number of concurrent
        let (limit_sender, limit_receiver): (Sender<bool>, Receiver<bool>) = mpsc::channel(size);

        // Pool
        let (pool, pool_receiver): (Sender<T>, Receiver<T>) = mpsc::channel(size);

        // CORE
        task::spawn(Self::core(limit_sender, limit_receiver, pool_receiver, close_func));

        TaskPool {
            pool: Some(pool)
        }
    }

    /// Issuance of specific tasks
    pub async fn send_task(&self, task: T) {
        if let Some(channel) = &self.pool {
            channel.send(task).await.unwrap_err();
        }
    }

    /// Missions issued
    pub async fn over(&mut self) {
        let r = self.pool.take();
        if let Some(rr) = r {
            drop(rr);
        }
    }

    /// core
    async fn core(limit_sender: Sender<bool>, limit_receiver: Receiver<bool>, mut pool_receiver: Receiver<T>, close_fn: T) {
        let wg = WaitGroup::new().await;

        // Restricted flow
        let limit_receiver = Arc::new(Mutex::new(limit_receiver));

        'lp:
        loop {
            tokio::select! {
                val = pool_receiver.recv() => {
                    match val {
                        Some(fun) => {
                            wg.add(1).await;
                            limit_sender.send(true).await.unwrap();

                            let limit_receiver = limit_receiver.clone();
                            let tg = wg.clone();
                            task::spawn(async move {
                                fun.await;

                                limit_receiver.lock().await.recv().await;
                                tg.done().await.unwrap();
                            });
                        }
                        None => {
                            break 'lp;
                        }
                    }
                }
            }
        }

        wg.wait().await;
        close_fn.await;
    }
}

评论区

写评论
作者 WorldLink 2021-01-20 18:19

谢谢大佬回复

贴下完整代码

struct DynFn
{
    funcs: Vec<std::pin::Pin<Box<dyn std::future::Future<Output=()>>>>
}

impl DynFn
{
    fn new() -> DynFn {
        DynFn {
            funcs: Vec::new()
        }
    }
}


#[async_std::main]
async fn main() {
    println!("Hello, world!");

    let mut v=DynFn::new();
    v.funcs.push(Box::pin(  async  { println!("ok1");    }  ));
    v.funcs.push(Box::pin(  async  { println!("ok2");    }  ));
    for item in v.funcs{
        item.await
    }
}

93996817 2021-01-20 16:06
    async_std::task::block_on( async{
        let mut v=DynFn::new();
        v.funcs.push(Box::pin(  async  { println!("ok1");    }  ));
        v.funcs.push(Box::pin(  async  { println!("ok2");    }  ));
        for item in v.funcs{
            item.await
        }
    });
    /*  TomRust
     Running `target\debug\test001.exe`
     ok1
     ok2
    */
       
1 共 2 条评论, 1 页