进一步的研究发现,rust 的异步任务不同于rust的线程,也不同于 js 的 Promise 是可以安全的取消并停止执行的。
技术细节:
- tokio::task::JoinHandle.abort 可以继续 JoinHandle.await
- async_std::task::JoinHandle.cancel 消费了 JoinHandle 无法 await , 我觉得这个没有 tokio 处理好
- 实际上 进化的 Http Server : 五 异步优雅停机优化 的 race 内部也是取消异步任务
tokio 版
use tokio::task::spawn;
use tokio::time::sleep;
use tokio::io::{Result, Error, ErrorKind, AsyncWriteExt, BufReader, AsyncBufReadExt, copy};
use std::time::Duration;
use tokio::sync::mpsc::unbounded_channel as channel;
use tokio::fs::File;
use tokio::net::{TcpListener, TcpStream};
use tokio::select;
#[tokio::main]
async fn main() -> Result<()> {
let (kill_switch, mut kill_switch_receiver) = channel::<()>();
let local_host = "127.0.0.1";
let port = 20083;
let listener = TcpListener::bind((local_host, port)).await?;
let accept_loop = spawn(async move {
while let Ok((stream, addr)) = listener.accept().await {
let kill_switch = kill_switch.clone();
spawn(async move {
if let Ok(RequestResult::Quit) = handle_connection(stream).await {
kill_switch.send(()).unwrap();
}
});
}
});
println!("server started at http://{}:{}/ serving files in {:?}", local_host, port, std::env::current_dir().unwrap_or_default());
kill_switch_receiver.recv().await;
accept_loop.abort();
match accept_loop.await {
Ok(_) => Ok(()),
Err(e) if e.is_cancelled() => Ok(()),
Err(e) => Err(e),
}?;
Ok(())
}
enum RequestResult {
Ok,
Quit,
}
// 省略 DispatchMessage RequestResult handle_connection
async-std 版
use async_std::task::spawn;
use async_std::task::sleep;
use async_std::io::{Result, Error, ErrorKind, BufReader, copy, prelude::*};
use std::time::Duration;
use async_std::channel::unbounded as channel;
use async_std::fs::File;
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::{Future, FutureExt};
#[async_std::main]
async fn main() -> Result<()> {
let (kill_switch, kill_switch_receiver) = channel::<()>();
let local_host = "127.0.0.1";
let port = 20083;
let listener = TcpListener::bind((local_host, port)).await?;
let accept_loop = spawn(async move {
while let Ok((stream, addr)) = listener.accept().await {
let kill_switch = kill_switch.clone();
spawn(async move {
if let Ok(RequestResult::Quit) = handle_connection(stream).await {
kill_switch.send(()).await;
}
});
}
});
println!("server started at http://{}:{}/ serving files in {:?}", local_host, port, std::env::current_dir().unwrap_or_default());
kill_switch_receiver.recv().await;
accept_loop.cancel().await;
//accept_loop.await;
Ok(())
}
enum RequestResult {
Ok,
Quit,
}
// 省略 DispatchMessage RequestResult handle_connection
1
共 0 条评论, 1 页
评论区
写评论还没有评论