< 返回我的博客

viruscamp 发表于 2021-05-09 22:57

在学习 rust 的 async 过程中,发现同步的io操作只剩下 println! 宏了,那么就试着换成异步操作。 tokio 没有替代,而 async-std 有。
写个程序如下:

use async_std::task::spawn;
use async_std::io::prelude::*;

#[async_std::main]
async fn main() {
    async_std::println!("main {}", 1).await;

    async {
        async_std::println!("spawn {}", 1).await;
    }.await;

    // compile error
    spawn(async {
        async_std::println!("spawn {}", 1).await;
    });
}

竟然爆了,而且编译错误极为复杂,很难理解:

error[E0277]: `core::fmt::Opaque` cannot be shared between threads safely                                                                                
  --> async-format\src/async-println-1-fail.rs:13:5
   |
13 |     spawn(async {
   |     ^^^^^ `core::fmt::Opaque` cannot be shared between threads safely
   | 
  ::: D:\rust\cargo\registry\src\crates.rustcc.com-a21e0f92747beca3\async-std-1.9.0\src\task\spawn.rs:28:29
   |
28 |     F: Future<Output = T> + Send + 'static,
   |                             ---- required by this bound in `async_std::task::spawn`
   |
   = help: the trait `Sync` is not implemented for `core::fmt::Opaque`
   = note: required because of the requirements on the impl of `std::marker::Send` for `&core::fmt::Opaque`
   = note: required because it appears within the type `ArgumentV1<'_>`
   = note: required because it appears within the type `[ArgumentV1<'_>; 1]`
   = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14, 't15> {ResumeTy, &'r str, [&'s str; 1], &'t0 [&'t1 str], &'t2 [&'t3 str; 1], i32, &'t4 i32, (&'t5 i32,), [ArgumentV1<'t6>; 1], &'t7 [ArgumentV1<'t8>], &'t9 [ArgumentV1<'t10>; 1], Arguments<'t11>, impl Future, (), [ArgumentV1<'t13>; 0], &'t14 [ArgumentV1<'t15>; 0]}`
   = note: required because it appears within the type `[static generator@D:\rust\cargo\registry\src\crates.rustcc.com-a21e0f92747beca3\async-std-1.9.0\src\macros.rs:89:29: 92:6 for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14, 't15> {ResumeTy, &'r str, [&'s str; 
1], &'t0 [&'t1 str], &'t2 [&'t3 str; 1], i32, &'t4 i32, (&'t5 i32,), [ArgumentV1<'t6>; 1], &'t7 [ArgumentV1<'t8>], &'t9 [ArgumentV1<'t10>; 1], Arguments<'t11>, impl Future, (), [ArgumentV1<'t13>; 0], &'t14 [ArgumentV1<'t15>; 0]}]`
   = note: required because it appears within the type `{ResumeTy, [static generator@D:\rust\cargo\registry\src\crates.rustcc.com-a21e0f92747beca3\async-std-1.9.0\src\macros.rs:89:29: 92:6 for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14, 't15> {ResumeTy, &'r str, [&'s str; 1], &'t0 [&'t1 str], &'t2 [&'t3 str; 1], i32, &'t4 i32, (&'t5 i32,), [ArgumentV1<'t6>; 1], &'t7 [ArgumentV1<'t8>], &'t9 [ArgumentV1<'t10>; 1], Arguments<'t11>, impl Future, (), [ArgumentV1<'t13>; 0], &'t14 [ArgumentV1<'t15>; 0]}], impl Future, ()}`
   = note: required because it appears within the type `[static generator@async-format\src/async-println-1-fail.rs:13:17: 15:6 {ResumeTy, [static generator@D:\rust\cargo\registry\src\crates.rustcc.com-a21e0f92747beca3\async-std-1.9.0\src\macros.rs:89:29: 92:6 for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14, 't15> {ResumeTy, &'r str, [&'s str; 1], &'t0 [&'t1 str], &'t2 [&'t3 str; 1], i32, &'t4 i32, (&'t5 i32,), [ArgumentV1<'t6>; 1], &'t7 [ArgumentV1<'t8>], &'t9 [ArgumentV1<'t10>; 1], Arguments<'t11>, impl Future, (), [ArgumentV1<'t13>; 0], &'t14 [ArgumentV1<'t15>; 0]}], impl Future, ()}]`
   = note: required because it appears within the type `from_generator::GenFuture<[static generator@async-format\src/async-println-1-fail.rs:13:17: 15:6 
{ResumeTy, [static generator@D:\rust\cargo\registry\src\crates.rustcc.com-a21e0f92747beca3\async-std-1.9.0\src\macros.rs:89:29: 92:6 for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14, 't15> {ResumeTy, &'r str, [&'s str; 1], &'t0 [&'t1 str], &'t2 [&'t3 str; 1], i32, &'t4 i32, (&'t5 i32,), [ArgumentV1<'t6>; 1], &'t7 [ArgumentV1<'t8>], &'t9 [ArgumentV1<'t10>; 1], Arguments<'t11>, impl Future, (), [ArgumentV1<'t13>; 0], &'t14 [ArgumentV1<'t15>; 0]}], impl Future, ()}]>`
   = note: required because it appears within the type `impl Future`

搜索后,发现问题应该就类似于 nightly-09-11 引入的 async 块 Send 约束问题
手动展开async_std::println!宏,简化下,得到下面代码:

use async_std::task::spawn;
use async_std::io::prelude::*;

#[async_std::main]
async fn main() {
    spawn(async {
        let mut stdout = async_std::io::stdout();
        let mut string = String::new();

        let fmt = format_args!("spawn {}", 2);
        std::fmt::write(&mut string, fmt).unwrap();

        stdout.write_all(string.as_bytes()).await;
    });
}

这个编译错误就容易理解了,而且跟前一个编译错误本质上一样。

error: future cannot be sent between threads safely
  --> async-format\src/async-println-2-fail.rs:6:5
   |
6  |     spawn(async {
   |     ^^^^^ future created by async block is not `Send`
   | 
  ::: D:\rust\cargo\registry\src\crates.rustcc.com-a21e0f92747beca3\async-std-1.9.0\src\task\spawn.rs:28:29
   |
28 |     F: Future<Output = T> + Send + 'static,
   |                             ---- required by this bound in `async_std::task::spawn`
   |
   = help: within `[ArgumentV1<'_>]`, the trait `Sync` is not implemented for `core::fmt::Opaque`
note: future is not `Send` as this value is used across an await
  --> async-format\src/async-println-2-fail.rs:13:9
   |
10 |         let fmt = format_args!("spawn {}", 2);
   |             --- has type `Arguments<'_>` which is not `Send`
...
13 |         stdout.write_all(string.as_bytes()).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `fmt` maybe used later
14 |     });
   |     - `fmt` is later dropped here
  1. async_std::task::spawn(f: F) where F: Future<Output = T> + Send + 'static 需要它的参数实现 Send 因为它的多线程执行器可能把异步任务放到不同线程执行
    1. 有一个 async_std::task::spawn_local(f: F) where F: Future<Output = T> + 'static 但 rust 没有函数重载和偏特化,人工判断用哪个就很麻烦
  2. rustc 把 async 块编译成状态机,一个.await就是一个状态。碰到.await时 async 块内还存活的变量会被保存进状态,只要有一个此时存活的变量是!Send,最终 async 块就是!Send
  3. 展开宏并简化后的代码编译错误很清楚的告诉我:note: future is not `Send` as this value is used across an await 变量 fmt 就是跨.await!Send,导致 async 块 !Send
    1. 我觉得 rust 的 NLL 应该可以在 13行之前干掉 fmt ,但这并没有发生。
  4. 展开宏前的代码错误简单的说就是:有多个!Send的临时变量,类型大概是ArgumentV1<'_> ,被保存进 async 块导致它!Send。本质上还是临时变量跨.await,但编译错误里面没有提到。
  5. 我们写 async 块时,可能需要额外的精力来保证临时变量不会跨.await。即使是没有Send问题的临时变量也会增大 async 块占用的内存空间。
  6. 导致错误的临时变量 Arguments<'_> 或者 ArgumentV1<'_> 是哪来的。
    println!, format! 等有格式化功能的宏,最终会调用format_args!, 它的直接返回类型是core::fmt::Arguments<'a>。但这个宏是compiler built-in没有直接的源码。
    这个宏先在栈上创建了ArgumentV1<'_>,然后创建Arguments<'_>里面包括ArgumentV1<'_>的借用。
    你可以拿到let fmt:Arguments<'_> = format_args!("spawn {}", 2);,但你不能用,因为ArgumentV1<'_> 被 drop 了。这解释了 3.i 。
    这个坑让你很难发现原因和改正错误,我建议尽量不要手动传递和保存Arguments<'_>
  7. 最后谈谈块表达式中临时变量的drop时机,我觉得这是个巨坑。
    同步情况下会碰到,而且很难发现
// 将 mpsc 包装成多个消费者
let receiver: Arc<Mutex<mpsc::Reciever>>;

// 不可能让出 receiver 的锁,没有编译错误,不算死锁
while let Ok(job) = receiver.lock().unwrap().recv() {
    Worker::execute_job(id, job);
}

// 改好,可以让出锁了
while let Ok(job) = { let x = receiver.lock().unwrap().recv(); x } {
    Worker::execute_job(id, job);
}

异步情况下,常见的就是我们今天碰到的问题了。看看常见的解决方案:

// 编译错误还算清楚:
// first, await occurs here, with `format_args!("spawn {}", 4)` maybe used later...
// has type `[ArgumentV1<'_>; 1]` which is not `Send`
spawn(async {
    async_std::io::stdout().write_fmt(format_args!("spawn {}", 4)).await;
}).await;

// 这里非常坑:两个临时变量, stdout 要跨 .await, format_args 不能跨 .await
spawn(async {
    let mut stdout = async_std::io::stdout();
    { let x = stdout.write_fmt(format_args!("spawn {}", 4)); x }.await;
}).await;

评论区

写评论
作者 viruscamp 2021-05-12 10:44

我得撤回上一个评论,那样只能保证Arguments<'_>不跨.await, _print 函数外面创建的 [ArgumentV1<'a>] 还是跨.await 了。

macro_rules! println {
    ($($arg:tt)*) => (async {
        let mut stdout = async_std::io::stdout(); // make stdout live longer than .await
        if let Err(e) = {
            let x = writeln!(&mut stdout, $($arg)*);
            // drop Arguments<'_> and [ArgumentV1<'a>] to make them live shorter than .await
            x
        }.await {
            panic!("failed printing to stdout: {}", e);
        }
    });
}
作者 viruscamp 2021-05-11 21:56

现在我们就能改 async_std::println!Send 导致不能spawn的问题了。官方 issue 和我的评论 Not able to await on write! in spawned async task #516

For async-std-1.9.0, macro async_std::write is Send now. But print println eprint eprintln are still !Send.
I have got a solution, please change async_std::io::stdio::_print and async_std::io::stdio::_eprint as:

pub async fn _print(args: std::fmt::Arguments<'_>) {
    let mut stdout = async_std::io::stdout(); // make stdout live longer than .await
    if let Err(e) = {
        let x = stdout.write_fmt(args); // force drop args:Arguments<'_> and [ArgumentV1<'a>] to make them live shorter than .await
        x
    }.await {
        panic!("failed printing to stdout: {}", e);
    }
}

It depends on that async_std::io::write::write_fmt::WriteFmtFuture does not hold an Arguments<'_>.

1 共 2 条评论, 1 页