我定义了一个TaskPool的struct 来对异步操作做调度
pub struct TaskPool<T>
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
{
    /// Pool
    pool: Option<Sender<T>>,
}
但是执行
tokio_test::block_on(async {
            let task_pool = TaskPool::new(10, async {
                println!("hello world 1314");
            }).await;
            task_pool.send_task(async {
                println!("hello world");
            }).await;
            tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
        })
报错
error[E0308]: mismatched types
   --> src/lib.rs:134:39
    |
130 |               let task_pool = TaskPool::new(10, async {
    |  _____________________________________________________-
131 | |                 println!("hello world 1314");
132 | |             }).await;
    | |_____________- the expected generator
133 | 
134 |               task_pool.send_task(async {
    |  _______________________________________^
135 | |                 println!("hello world");
136 | |             }).await;
    | |_____________^ expected generator, found a different generator
    |
    = note: expected generator `[static generator@src/lib.rs:130:53: 132:14 _]`
               found generator `[static generator@src/lib.rs:134:39: 136:14 _]`
error: aborting due to previous error
For more information about this error, try `rustc --explain E0308`.
error: could not compile `tokio_utils`.
To learn more, run the command again with --verbose.
Process finished with exit code 101
这是因为
where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
这个是静态分发的,  这个异步闭包 怎么动态分发?
这样???
    struct DynFn
    {
        funcs: Vec<Box<dyn std::future::Future<Output=()>>>
    }
    impl DynFn
    {
        fn new() -> DynFn {
            DynFn {
                funcs: Vec::new()
            }
        }
        pub async fn run(&self, f: Box<dyn std::future::Future<Output=()>>)
        {
            f.await;  //  the trait `std::marker::Unpin` is not implemented for `dyn std::future::Future<Output = ()>`
        }
    }
    #[test]
    fn test_dyn_func() {
        tokio_test::block_on(async {
            let d = DynFn::new();
            d.run(Box::new(async {
                println!("hello world");
            }));
        });
    }
一下是该函数的完整代码: pool.rs
use std::future::Future;
use tokio::sync::{mpsc, Mutex};
use tokio::task;
use tokio::sync::mpsc::{Sender, Receiver};
use crate::WaitGroup;
use std::sync::Arc;
/// Asynchronous task pools
pub struct TaskPool<T>
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
{
    /// Pool
    pool: Option<Sender<T>>,
}
impl<T> TaskPool<T>
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
{
    /// Initialisation Asynchronous task pools
    pub async fn new(size: usize, close_func: T) -> TaskPool<T> {
        // Control the number of concurrent
        let (limit_sender, limit_receiver): (Sender<bool>, Receiver<bool>) = mpsc::channel(size);
        // Pool
        let (pool, pool_receiver): (Sender<T>, Receiver<T>) = mpsc::channel(size);
        // CORE
        task::spawn(Self::core(limit_sender, limit_receiver, pool_receiver, close_func));
        TaskPool {
            pool: Some(pool)
        }
    }
    /// Issuance of specific tasks
    pub async fn send_task(&self, task: T) {
        if let Some(channel) = &self.pool {
            channel.send(task).await.unwrap_err();
        }
    }
    /// Missions issued
    pub async fn over(&mut self) {
        let r = self.pool.take();
        if let Some(rr) = r {
            drop(rr);
        }
    }
    /// core
    async fn core(limit_sender: Sender<bool>, limit_receiver: Receiver<bool>, mut pool_receiver: Receiver<T>, close_fn: T) {
        let wg = WaitGroup::new().await;
        // Restricted flow
        let limit_receiver = Arc::new(Mutex::new(limit_receiver));
        'lp:
        loop {
            tokio::select! {
                val = pool_receiver.recv() => {
                    match val {
                        Some(fun) => {
                            wg.add(1).await;
                            limit_sender.send(true).await.unwrap();
                            let limit_receiver = limit_receiver.clone();
                            let tg = wg.clone();
                            task::spawn(async move {
                                fun.await;
                                limit_receiver.lock().await.recv().await;
                                tg.done().await.unwrap();
                            });
                        }
                        None => {
                            break 'lp;
                        }
                    }
                }
            }
        }
        wg.wait().await;
        close_fn.await;
    }
}
	    
	    
		1
	    
	    
	    共 2 条评论, 1 页
	
	
    
评论区
写评论谢谢大佬回复
贴下完整代码