我定义了一个TaskPool的struct 来对异步操作做调度
pub struct TaskPool<T>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
/// Pool
pool: Option<Sender<T>>,
}
但是执行
tokio_test::block_on(async {
let task_pool = TaskPool::new(10, async {
println!("hello world 1314");
}).await;
task_pool.send_task(async {
println!("hello world");
}).await;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
})
报错
error[E0308]: mismatched types
--> src/lib.rs:134:39
|
130 | let task_pool = TaskPool::new(10, async {
| _____________________________________________________-
131 | | println!("hello world 1314");
132 | | }).await;
| |_____________- the expected generator
133 |
134 | task_pool.send_task(async {
| _______________________________________^
135 | | println!("hello world");
136 | | }).await;
| |_____________^ expected generator, found a different generator
|
= note: expected generator `[static generator@src/lib.rs:130:53: 132:14 _]`
found generator `[static generator@src/lib.rs:134:39: 136:14 _]`
error: aborting due to previous error
For more information about this error, try `rustc --explain E0308`.
error: could not compile `tokio_utils`.
To learn more, run the command again with --verbose.
Process finished with exit code 101
这是因为
where
T: Future + Send + 'static,
T::Output: Send + 'static,
这个是静态分发的, 这个异步闭包 怎么动态分发?
这样???
struct DynFn
{
funcs: Vec<Box<dyn std::future::Future<Output=()>>>
}
impl DynFn
{
fn new() -> DynFn {
DynFn {
funcs: Vec::new()
}
}
pub async fn run(&self, f: Box<dyn std::future::Future<Output=()>>)
{
f.await; // the trait `std::marker::Unpin` is not implemented for `dyn std::future::Future<Output = ()>`
}
}
#[test]
fn test_dyn_func() {
tokio_test::block_on(async {
let d = DynFn::new();
d.run(Box::new(async {
println!("hello world");
}));
});
}
一下是该函数的完整代码: pool.rs
use std::future::Future;
use tokio::sync::{mpsc, Mutex};
use tokio::task;
use tokio::sync::mpsc::{Sender, Receiver};
use crate::WaitGroup;
use std::sync::Arc;
/// Asynchronous task pools
pub struct TaskPool<T>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
/// Pool
pool: Option<Sender<T>>,
}
impl<T> TaskPool<T>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
/// Initialisation Asynchronous task pools
pub async fn new(size: usize, close_func: T) -> TaskPool<T> {
// Control the number of concurrent
let (limit_sender, limit_receiver): (Sender<bool>, Receiver<bool>) = mpsc::channel(size);
// Pool
let (pool, pool_receiver): (Sender<T>, Receiver<T>) = mpsc::channel(size);
// CORE
task::spawn(Self::core(limit_sender, limit_receiver, pool_receiver, close_func));
TaskPool {
pool: Some(pool)
}
}
/// Issuance of specific tasks
pub async fn send_task(&self, task: T) {
if let Some(channel) = &self.pool {
channel.send(task).await.unwrap_err();
}
}
/// Missions issued
pub async fn over(&mut self) {
let r = self.pool.take();
if let Some(rr) = r {
drop(rr);
}
}
/// core
async fn core(limit_sender: Sender<bool>, limit_receiver: Receiver<bool>, mut pool_receiver: Receiver<T>, close_fn: T) {
let wg = WaitGroup::new().await;
// Restricted flow
let limit_receiver = Arc::new(Mutex::new(limit_receiver));
'lp:
loop {
tokio::select! {
val = pool_receiver.recv() => {
match val {
Some(fun) => {
wg.add(1).await;
limit_sender.send(true).await.unwrap();
let limit_receiver = limit_receiver.clone();
let tg = wg.clone();
task::spawn(async move {
fun.await;
limit_receiver.lock().await.recv().await;
tg.done().await.unwrap();
});
}
None => {
break 'lp;
}
}
}
}
}
wg.wait().await;
close_fn.await;
}
}
1
共 2 条评论, 1 页
评论区
写评论谢谢大佬回复
贴下完整代码