< 返回我的博客

viruscamp 发表于 2021-05-02 19:35

进化的 Http Server : 四 异步实现真正的优雅停机 既然能够真正的停止 listener,那就不需要双循环了,优化如下。tokio 和 async-std 的都有。

技术细节:

  1. 这里 kill_switch 用了 unbounded_channel 而不是 tokio::sync::oneshot::channel 是因为用起来太麻烦,大概要 Arc<Mutex<Option<oenshot::Sender>>>,读者可以改下试试。
  2. FutureExt::race 相当于js的 Promise.race,而 tokio::select! 更加底层功能更强
use tokio::task::spawn;
use tokio::time::sleep;
use tokio::io::{Result, Error, ErrorKind, AsyncWriteExt, BufReader, AsyncBufReadExt, copy};
use std::time::Duration;
use tokio::sync::mpsc::unbounded_channel as channel;
use tokio::fs::File;
use tokio::net::{TcpListener, TcpStream};
use tokio::select;

#[tokio::main]
async fn main() -> Result<()> {
    let (kill_switch, mut kill_switch_receiver) = channel::<()>();

    let local_host = "127.0.0.1";
    let port = 20083;
    let listener = TcpListener::bind((local_host, port)).await?;
    let accept_loop = spawn(async move {
        select! {
            _ = async {
                while let Ok((stream, addr)) = listener.accept().await {
                    let kill_switch = kill_switch.clone();
                    spawn(async move {
                        if let Ok(RequestResult::Quit) = handle_connection(stream).await {
                            kill_switch.send(()).unwrap();
                        }
                    });
                }
            } => {}
            _ = kill_switch_receiver.recv() => {}
        }
    });
    println!("server started at http://{}:{}/ serving files in {:?}", local_host, port, std::env::current_dir().unwrap_or_default());

    accept_loop.await?;
    Ok(())
}
// 省略 DispatchMessage RequestResult handle_connection

同功能的 async-std 版

use async_std::task::spawn;
use async_std::task::sleep;
use async_std::io::{Result, Error, ErrorKind, BufReader, copy, prelude::*};
use std::time::Duration;
use async_std::channel::unbounded as channel;
use async_std::fs::File;
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::{Future, FutureExt};

#[async_std::main]
async fn main() -> Result<()> {
    let (kill_switch, kill_switch_receiver) = channel::<()>();

    let local_host = "127.0.0.1";
    let port = 20083;
    let listener = TcpListener::bind((local_host, port)).await?;
    let accept_loop = spawn(FutureExt::race(
        async move {
            while let Ok((stream, addr)) = listener.accept().await {
                let kill_switch = kill_switch.clone();
                spawn(async move {
                    if let Ok(RequestResult::Quit) = handle_connection(stream).await {
                        kill_switch.send(()).await;
                    }
                });
            }
        },
        async move {
            kill_switch_receiver.recv().await;
        })
    );
    println!("server started at http://{}:{}/ serving files in {:?}", local_host, port, std::env::current_dir().unwrap_or_default());

    accept_loop.await;
    Ok(())
}
// 省略 DispatchMessage RequestResult handle_connection

评论区

写评论

还没有评论

1 共 0 条评论, 1 页