< 返回我的博客

viruscamp 发表于 2021-05-07 22:06

进一步的研究发现,rust 的异步任务不同于rust的线程,也不同于 js 的 Promise 是可以安全的取消并停止执行的。
技术细节:

  1. tokio::task::JoinHandle.abort 可以继续 JoinHandle.await
  2. async_std::task::JoinHandle.cancel 消费了 JoinHandle 无法 await , 我觉得这个没有 tokio 处理好
  3. 实际上 进化的 Http Server : 五 异步优雅停机优化 的 race 内部也是取消异步任务

tokio 版

use tokio::task::spawn;
use tokio::time::sleep;
use tokio::io::{Result, Error, ErrorKind, AsyncWriteExt, BufReader, AsyncBufReadExt, copy};
use std::time::Duration;
use tokio::sync::mpsc::unbounded_channel as channel;
use tokio::fs::File;
use tokio::net::{TcpListener, TcpStream};
use tokio::select;

#[tokio::main]
async fn main() -> Result<()> {
    let (kill_switch, mut kill_switch_receiver) = channel::<()>();

    let local_host = "127.0.0.1";
    let port = 20083;
    let listener = TcpListener::bind((local_host, port)).await?;
    let accept_loop = spawn(async move {
        while let Ok((stream, addr)) = listener.accept().await {
            let kill_switch = kill_switch.clone();
            spawn(async move {
                if let Ok(RequestResult::Quit) = handle_connection(stream).await {
                    kill_switch.send(()).unwrap();
                }
            });
        }
    });
    println!("server started at http://{}:{}/ serving files in {:?}", local_host, port, std::env::current_dir().unwrap_or_default());

    kill_switch_receiver.recv().await;
    accept_loop.abort();
    match accept_loop.await {
        Ok(_) => Ok(()),
        Err(e) if e.is_cancelled() => Ok(()),
        Err(e) => Err(e),
    }?;
    Ok(())
}

enum RequestResult {
    Ok,
    Quit,
}
// 省略 DispatchMessage RequestResult handle_connection

async-std 版

use async_std::task::spawn;
use async_std::task::sleep;
use async_std::io::{Result, Error, ErrorKind, BufReader, copy, prelude::*};
use std::time::Duration;
use async_std::channel::unbounded as channel;
use async_std::fs::File;
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::{Future, FutureExt};

#[async_std::main]
async fn main() -> Result<()> {
    let (kill_switch, kill_switch_receiver) = channel::<()>();

    let local_host = "127.0.0.1";
    let port = 20083;
    let listener = TcpListener::bind((local_host, port)).await?;
    let accept_loop = spawn(async move {
        while let Ok((stream, addr)) = listener.accept().await {
            let kill_switch = kill_switch.clone();
            spawn(async move {
                if let Ok(RequestResult::Quit) = handle_connection(stream).await {
                    kill_switch.send(()).await;
                }
            });
        }
    });
    println!("server started at http://{}:{}/ serving files in {:?}", local_host, port, std::env::current_dir().unwrap_or_default());

    kill_switch_receiver.recv().await;
    accept_loop.cancel().await;
    //accept_loop.await;
    Ok(())
}

enum RequestResult {
    Ok,
    Quit,
}
// 省略 DispatchMessage RequestResult handle_connection

评论区

写评论

还没有评论

1 共 0 条评论, 1 页