< 返回版块

洋芋 发表于 2020-02-20 10:29

Tags:rust,每周一知

这是一篇博文翻译,略有删减,整理代码方便统一阅读,Github链接:https://github.com/lesterli/rust-practice/tree/master/head-first/async-primer。

原文在2月11号的【Rust日报】中给大家推荐过, 原文链接: https://omarabid.com/async-rust

本文并不全面介绍Rust异步主题。如果对新的async/await关键字Futures感到疑惑,并且对Tokio的用途很感兴趣,那么到最后应该会不再毫无头绪。

Rust异步技术是Rust领域的新热点,它被誉为Rust的重要里程碑,特别适合开发高性能网络应用程序的人们。

让我们从头开始。

什么是异步?

关于Async,我给一个简短的版本:如果有一个处理器,想同时执行(类似)两项任务,将如何做?解决方案是先运行第一个任务,然后切换并运行第二个任务,然后再切换回去,依此类推,直到完成两个任务。

如果想给人以计算机同时运行两个任务的感觉(即多任务处理),则此功能很有用。另一个用例是IO操作。当程序等待网络响应时,CPU处于空闲状态。这是切换到另一个任务的理想时间。

那么我们如何编写异步代码?

首先,让我们从一些同步代码开始。

同步代码

让我们做一个简单的程序,该程序读取两个文件:file1.txtfile2.txt。我们从file1.txt开始,然后移至file2.txt

我们将程序分为两个文件:main.rsfile.rsfile.rs有一个函数:read_file,在main.rs中,用每个文件的路径为参数调用此函数。参见下面代码:

// sync-example/src/file.rs

use std::fs::File;
use std::io::{self, Read};

pub fn read_file(path: &str) -> io::Result<String> {
    let mut file = File::open(path)?;
    let mut buffer = String::new();
    file.read_to_string(&mut buffer)?;
    Ok(buffer)
}
// sync-example/src/main.rs

use std::io;

mod file;

fn main() -> io::Result<()> {
    println!("program started");

    let file1 = file::read_file("src/file1.txt")?;
    println!("processed file 1");

    let file2 = file::read_file("src/file2.txt")?;
    println!("processed file 2");

    dbg!(&file1);
    dbg!(&file2);

    Ok(())
}

使用cargo run编译并运行程序。该程序应该毫无意外地运行,但是请确保已在src文件夹中放置了两个文件(file1.txtfile2.txt)。

program started
processed file 1
processed file 2
[src/main.rs:14] &file1 = "file1"
[src/main.rs:15] &file2 = "file2"

到目前为止,一切都很好。如果需要在处理file2.txt之前先处理file1.txt,那么这是唯一的方法。但是有时不必关心每个文件的处理顺序。理想情况下,希望尽快处理文件。

在这种情况下,我们可以利用多线程。

多线程方法

为此,我们为每个函数调用运行一个单独的线程。由于我们使用的是多线程代码,并且如果要访问线程外部的文件内容,则必须使用Rust提供的同步原语之一。

这将如何影响代码:file.rs将保持不变,因此这已经是一件好事了。在main.rs中,我们需要初始化两个RwLock;这些将稍后在线程中用于存储文件内容。

然后,我们运行一个无限循环,尝试读取这两个变量的内容。如果这些变量不为空,则我们知道文件处理(或读取)已完成。 (这意味着文件不应为空;否则,我们的程序将错误地保持等待状态。另一种方法是使用Option<String>并检查Option是否为None)。

此代码需要crate lazy_static

// multi-example/src/main.rs

use std::io;
use std::sync::RwLock;
use std::thread;

use lazy_static::lazy_static;

mod file;

// A sync primitive that allows to read/write from variables between threads.
// we declare the variables here, this requires the lazy_static crate
lazy_static! {
    static ref FILE1: RwLock<String> = RwLock::new(String::from(""));
    static ref FILE2: RwLock<String> = RwLock::new(String::from(""));
}

fn main() -> io::Result<()> {
    println!("program started");

    let thread_1 = thread::spawn(|| {
        let mut w1 = FILE1.write().unwrap();
        *w1 = file::read_file("src/file1.txt").unwrap();
        println!("read file 1");
    });

    println!("Launched Thread 1");

    let thread_2 = thread::spawn(|| {
        let mut w2 = FILE2.write().unwrap();
        *w2 = file::read_file("src/file2.txt").unwrap();
        println!("read file 2");
    });

    println!("Launched Thread 2");

    let mut rf1: bool = false;
    let mut rf2: bool = false;

    loop {
    	// read()
        let r1 = FILE1.read().unwrap();
        let r2 = FILE2.read().unwrap();

        if *r1 != String::from("") && rf1 == false {
            println!("completed file 1");
            rf1 = true;
        }

        if *r2 != String::from("") && rf2 == false {
            println!("completed file 2");
            rf2 = true;
        }
    }

    Ok(())
}

有趣的是,如果我们有一个非常大的file1.txt,我们将得到一个奇怪的输出。首先处理第二个文件(读取文件2);但在我们的循环内部,该程序似乎阻塞并等待第一个文件。

program started
Launched Thread 1
Launched Thread 2
read file 2
read file 1
completed file 1
completed file 2

多线程可能有点棘手,因为我们必须考虑可能阻塞的原子操作。我们使用read函数来解锁我们的变量,并且文档对这种行为发出警告。

使用共享的读取访问权限锁定此rwlock,阻塞当前线程,直到可以获取它为止。

幸运的是,有一个try_read函数,如果无法获取锁,则返回Err

尝试使用共享的读取访问权限获取此rwlock

如果此时不能授予访问权限,则返回Err。 否则,将返回RAII保护,当该保护被删除时,该保护将释放共享访问。

在第二次尝试中,我们使用try_read并忽略返回的Errs,因为它们应该表示我们的锁正忙。这有助于将程序移至下一个变量,并处理先准备好的变量。

// multi-example/src/main.rs

...
    loop {
    	// try_read()
        let r1 = FILE1.try_read();
        let r2 = FILE2.try_read();

        match r1 {
            Ok(v) => {
                if *v != String::from("") && rf1 == false {
                    println!("completed file 1");
                    rf1 = true;
                }
            }
            // If rwlock can't be acquired, ignore the error
            Err(_) => {}
        }

        match r2 {
            Ok(v) => {
                if *v != String::from("") && rf2 == false {
                    println!("completed file 2");
                    rf2 = true;
                }
            }
            // If rwlock can't be acquired, ignore the error
            Err(_) => {}
        }
    }
...

现在执行方式有所不同。如果file1.txtfile2.txt大得多,则应首先处理第二个文件。

program started
Launched Thread 1
Launched Thread 2
read file 2
completed file 2
read file 1
completed file 1

多线程的局限性

如果我们已经有多线程,为什么我们需要异步?有两个主要优点:性能和简单性。产生线程很昂贵;从以上内容可以得出结论,编写多线程代码可能会变得非常复杂。

异步,关键字

Rust的重点是使编写Async代码尽可能简单。只需要在函数声明之前添加async/await关键字即可使代码异步:函数声明前async,解析异步函数await

这听起来很不错。试一试吧。

use std::fs::File;
use std::io::{self, Read};

pub async fn read_file(path: &str) -> io::Result<String> {
    let mut file = File::open(path)?;
    let mut buffer = String::new();
    file.read_to_string(&mut buffer)?;
    Ok(buffer)
}
use std::io;

mod file;

fn main() -> io::Result<()> {
    let r1 = file::read_file("src/file1.txt");
    let r2 = file::read_file("src/file2.txt");

    let f1 = r1.await;
    let f2 = r2.await;

    dbg!(f1);
    dbg!(f2);

    Ok(())
}

但是这不能通过编译,await仅在异步块或函数中可用。如果我们尝试运行此代码,则编译器将引发此错误。

error[E0728]: `await` is only allowed inside `async` functions and blocks
 --> src/main.rs:9:14
  |
5 | fn main() -> io::Result<()> {
  |    ---- this is not `async`
...
9 |     let f1 = r1.await;
  |              ^^^^^^^^ only allowed inside `async` functions and blocks

我们可以使main函数异步吗?不幸的是,事情并非如此简单。我们得到另一个错误。

error[E0277]: `main` has invalid return type `impl std::future::Future`
 --> src/main.rs:5:20
  |
5 | async fn main() -> io::Result<()> {
  |                    ^^^^^^^^^^^^^^ `main` can only return types that implement `std::process::Termination`
  |
  = help: consider using `()`, or a `Result`

但是,错误消息有点令人着迷。似乎async关键字使我们的函数返回Future而不是声明的类型。

异步函数的返回类型是Future(确切地说是实现Future特性的闭包)。

await呢?await在整个Future中循环直至完成。但是,还有另外一个谜团:Rust无法自解析Future。我们需要一个执行器来运行此异步代码。

什么是执行器?

如果回顾一下我们的多线程示例,会注意到我们使用循环来检测何时处理文件。这很简单:无限循环直到变量中包含某些内容,然后执行某些操作。如果读取两个文件,我们可以通过跳出循环来改善这一点。

一个异步执行器是循环。默认情况下,Rust没有任何内置的执行程序。有许多异步运行时;async-stdTokio是最受欢迎的。运行时的工作是轮询异步函数(Future),直到它们最终返回一个值。

一个简单的执行器

crate futures有一个非常基本的执行器,并且具有将两个Future连接的函数。让我们试一试。

以下代码使用crate futures版本0.3.4。

// async-example/src/main.rs

use futures::executor::block_on;
use futures::join;
use std::io;

mod file;

fn main() -> io::Result<()> {

    println!("Program started");

    // Block on the final future
    block_on(load_files());

    Ok(())
}

async fn load_files() {
    // Join the two futures together
    join!(load_file_1(), load_file_2());
}

async fn load_file_1() {
    let r1 = file::read_file("src/file1.txt").await;
    println!("file 1 size: {}", r1.unwrap().len());
}

async fn load_file_2() {
    let r2 = file::read_file("src/file2.txt").await;
    println!("file 2 size: {}", r2.unwrap().len());
}

为了验证异步性,将一堆数据转储到file1.txt中。

Program started
file 1 size: 5399
file 2 size: 5

不幸的是,这看起来(确实)第一个文件函数再次阻塞了。

那么异步到底是什么?

与多线程类似,异步编程中也有一些陷阱和问题。事实是,async关键字不会神奇地使代码异步;它只是使函数返回Future。仍然必须繁重地安排代码执行时间。

这意味着函数必须迅速返回尚未准备就绪的状态,而不是被困在进行计算的过程中。在我们的情况下,阻塞是特定在File::Openfile.read_to_string处发生的。这两个函数不是异步的,因此会阻止执行。

我们需要创建这两个函数的异步版本。幸运的是,一些使用async-std的人做了工作,将Rust中的std库重写为异步版本。

使用async-std的文件IO

我们唯一要做的更改是将我们的std导入替换为async_std

对于以下示例,我们使用crate async-std版本1.5.0。

// async-example/src/file.rs

// We use async_std instead of std, it's that simple.
use async_std::io;
use async_std::fs::File;
use async_std::prelude::*;

pub async fn read_file(path: &str) -> io::Result<String> {
    let mut file = File::open(path).await?;
    let mut buffer = String::new();
    file.read_to_string(&mut buffer).await?;
    Ok(buffer)
}

main.rs中的代码保持不变;该程序仍使用crate futures中的block_on执行程序。

编译并运行程序。(确保有一个大的file1.txt

Program started
file 2 size: 5
file 1 size: 5399

最后!程序首先快速处理file2.txt,然后移至file1.txt

让我们回顾一下到目前为止所学到的东西:

  • async使我们的函数返回Future
  • 运行我们的Future需要一个运行时。
  • 运行时检查Future是否准备就绪;并在就绪时返回其值。

总结

在这篇文章中,我们介绍了同步代码,多线程代码,Rust中的一些异步术语,async-std库和简单的Future实现。实际上,这是一个"轻量级"的介绍,为简洁起见,省略了许多细节。

评论区

写评论
phper-chen 2020-02-20 11:12

很实用

1 共 1 条评论, 1 页