< 返回版块

kaixinbaba 发表于 2020-12-25 16:13

tokio 相同的代码通过 main 或者 test 执行的结果不同,这是为什么

async fn mock_fn(addr: &str) -> ZKResult<String> {
    let mut server_list = Vec::new();
    server_list.push(addr.to_string());

    let socket = match TcpStream::connect(server_list.get(0).unwrap().as_str()).await {
        Ok(socket) => socket,
        Err(e) => return Ok(addr.to_string()),
    };

    let (mut reader, mut writer) = io::split(socket);

    tokio::spawn(async move {
        loop {
            info!("writing!!!");
            thread::sleep(Duration::from_millis(500));
        }
        Ok::<_, io::Error>(())
    });
    info!("after spawn");
    Ok(addr.to_string())
}

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    info!("tokio");
    mock_fn("127.0.0.1:2181").await.unwrap();
    info!("after mock");
}


#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    #[tokio::test]
    async fn new_test() {
        pretty_env_logger::init();
        info!("tokio");
        mock_fn("127.0.0.1:2181").await.unwrap();
        thread::sleep(Duration::from_secs(10));
        info!("after mock");
    }
}

执行 main 的输出是:

 INFO  tokio_demo > tokio
 INFO  tokio_demo > after spawn
 INFO  tokio_demo > writing!!!
 INFO  tokio_demo > after mock
 INFO  tokio_demo > writing!!!
 INFO  tokio_demo > writing!!!
 INFO  tokio_demo > writing!!!
 INFO  tokio_demo > writing!!!
 INFO  tokio_demo > writing!!!
 INFO  tokio_demo > writing!!!
...

test 的输出:

 INFO  tokio_demo::tests > tokio
 INFO  tokio_demo        > after spawn
 INFO  tokio_demo::tests > after mock

我有两个问题

  • 用 test 执行的话 tokio::spawn 压根就没执行
  • 因为在 tokio::spawn 中使用了 loop,所以永远不会return,但是 main 函数结束的时候,程序却不会退出?这和 Java 里表现的不一样。。比较费解。。

然后我加了await关键字,代码中有注释

async fn mock_fn(addr: &str) -> ZKResult<String> {
    let mut server_list = Vec::new();
    server_list.push(addr.to_string());

    let socket = match TcpStream::connect(server_list.get(0).unwrap().as_str()).await {
        Ok(socket) => socket,
        Err(e) => return Ok(addr.to_string()),
    };

    let (mut reader, mut writer) = io::split(socket);

    tokio::spawn(async move {
        loop {
            info!("writing!!!");
            thread::sleep(Duration::from_millis(500));
        }
        Ok::<_, io::Error>(())
    }).await; // I add await here!!!!
    info!("after spawn");
    Ok(addr.to_string())
}

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    info!("tokio");
    mock_fn("127.0.0.1:2181").await.unwrap();
    info!("after mock");
}


#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    #[tokio::test]
    async fn new_test() {
        pretty_env_logger::init();
        info!("tokio");
        mock_fn("127.0.0.1:2181").await.unwrap();
        thread::sleep(Duration::from_secs(10));
        info!("after mock");
    }
}

现在两个入口的输出是一样了

 INFO  tokio_demo::tests > tokio (or  INFO  tokio_demo > tokio)
 INFO  tokio_demo        > writing!!!
 INFO  tokio_demo        > writing!!!
 INFO  tokio_demo        > writing!!!
 INFO  tokio_demo        > writing!!!
 INFO  tokio_demo        > writing!!!

但是在 await 之后的代码永远也不会被执行,因为 spawn 中是一个 loop, 整个程序就阻塞在了 await 那里了。

我其实的目标很简单,就是想实现一个生产者消费者模型,主线程负责生产,子线程使用 loop 消费,两者通过 channel 来传递数据。差不多是这样,我的疑问就这些,就想知道使用 tokio 怎么实现。。难道 tokio spawn出来的根本不是线程?我一开始就理解错了。。所以来求助 感谢


Ext Link: https://github.com/tokio-rs/tokio/issues/3337

评论区

写评论
fakeshadow 2020-12-26 07:21

test可能是默认单线程,被你的sleep阻塞了。

qwertzxcvb 2020-12-25 18:06

因为你test使用的是 #[tokio::test] ?

作者 kaixinbaba 2020-12-25 16:17

其实代码中的 socket 和 reader、writer 根本没用到,我只是一步步排查问题而留下的,因为这个项目本来是想做一个 ZK 的客户端,大家可以忽略,重点是上面说的 tokio

1 共 3 条评论, 1 页