< 返回我的博客

viruscamp 发表于 2021-05-01 23:19

进化的 Http Server : 一 多线程 我们说过:

标准库的 TcpListener 是没有什么正常的手段停止的
accept_loop 以及 listener 是主线程退出时杀掉的,不算 listener 的正常停止

异步的 TcpListener 是可以正常退出的。请 diff 下面的程序和 进化的 Http Server : 二 猴子都会写异步 使用 tokio 的程序。 技术细节:

  1. 加一个 channel kill_switch 对这种只发一次的,tokio 的 oneshot 语义更清晰,当然 tokio 的 bounded_channel unbounded_channel 也可以用
  2. accept_loop 内用 select! 处理多个异步事件
  3. 主线程结束前 用 kill_switch 发消息给 accept_loop 让其停止, accept_loop.await 类似于线程的 join 等待异步任务退出。
use tokio::task::spawn;
use tokio::time::sleep;
use tokio::io::{Result, Error, ErrorKind, AsyncWriteExt, BufReader, AsyncBufReadExt, copy};
use std::time::Duration;
use tokio::sync::mpsc::unbounded_channel as channel;
use tokio::fs::File;
use tokio::net::{TcpListener, TcpStream};
use tokio::select;

#[tokio::main]
async fn main() -> Result<()> {
    let (dispatch_sender, mut dispatch_receiver) = channel::<DispatchMessage>();
    let (kill_switch, kill_switch_receiver) = tokio::sync::oneshot::channel::<()>();

    let local_host = "127.0.0.1";
    let port = 20083;
    let listener = TcpListener::bind((local_host, port)).await?;
    let dispatch_sender1 = dispatch_sender.clone();
    let accept_loop = spawn(async move {
        select! {
            _ = async {
                while let Ok((stream, addr)) = listener.accept().await {
                    dispatch_sender1.send(DispatchMessage::Connected(stream)).unwrap();
                }
            } => {}
            _ = kill_switch_receiver => {}
        }
    });
    println!("server started at http://{}:{}/ serving files in {:?}", local_host, port, std::env::current_dir().unwrap_or_default());

    while let Some(dispatch_message) = dispatch_receiver.recv().await {
        match dispatch_message {
            DispatchMessage::Connected(stream) => {
                let dispatch_sender = dispatch_sender.clone();
                spawn(async move {
                    if let Ok(RequestResult::Quit) = handle_connection(stream).await {
                        dispatch_sender.send(DispatchMessage::Quit).unwrap();
                    }
                });
            }
            DispatchMessage::Quit => { break; }
        }
    }

    kill_switch.send(()).unwrap();
    accept_loop.await?;
    Ok(())
}
// 省略 DispatchMessage RequestResult handle_connection

评论区

写评论

还没有评论

1 共 0 条评论, 1 页