< 返回版块

Kevin Wang 发表于 2018-12-31 18:07

Tags:async,await

首先,我想在公司内推广使用rust。公司产品属于类似家庭路由器一样cpu/内存相对宝贵的linux设备,里面的服务程序都是C语言写的、有状态的服务。我想尝试用rust逐步替代C,并且尽量避免多线程以节省资源,于是自然相中了async/await使用协程实现。

以进程管理的服务为例

进程管理服务我设想是设计为单线程,为方便描述,我简化为两部分:

  • 进程管理业务模块(实现进程控制状态管理的具体功能)
  • 控制模块(提供API给外部进程使用,启停进程,查看状态等)

用Python代码简单表示如下

import asyncio

class App:
    def __init__(self):
        self.procman = ProcessManager()
        self.api = ControlService(self.procman)

    async def serve(self):
        await asyncio.wait([
            self.procman.serve(),
            self.api.serve(),
        ])


class ControlService:
    def __init__(self, procman):
        self.procman = procman

    async def serve(self):
        while True:
            print('[ControlService   ] Waiting for user cmd...')
            cmd, params = await next_user_request()
            print("[ControlService   ] Received user cmd: {}".format(cmd))
            if cmd == "set state":
                _ok = self.procman.set_state(params)
                # ...
            elif cmd == "get state":
                _state = self.procman.get_state()
                # ...
            else:
                pass


class ProcessManager:
    def __init__(self):
        # 为便于描述理解,我将内部状态简化为一个state属性代表
        self._state = 0

    def set_state(self, v):
        # 这里set_state代表一个提供给外部调用的API, 会访问到内部状态
        self._state = 0
        return True

    def get_state(self):
        return self._state

    async def serve(self):
        # PM本身使用一个协程来实现内部功能,运行过程中也会改变内部state
        while True:
            self._state += 1
            print("self._state = {}".format(self._state))
            await asyncio.sleep(1)


async def next_user_request():
    await asyncio.sleep(5)
    return "set state", 0


if __name__ == '__main__':
    app = App()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(app.serve())
    loop.close()

改用Rust实现,首先直接转换过来v1如下:

#![feature(async_await, await_macro, futures_api, pin)]

use futures::{join};
use futures::executor::{self};

struct ProcessManager {
    state: i32
}

struct ControlService<'a> {
    procman: &'a mut ProcessManager
}

impl ProcessManager {
    async fn serve(&mut self) {
        loop {
            await!(async {
                // 这里省略业务代码
                self.state += 1
            });
        }
    }

    fn set_state(&mut self, state: i32) {
        self.state = state
    }
}

impl<'a> ControlService<'a> {
    async fn serve(&'a mut self) {
        loop {
            let cmd = await!(async {
                // 这里省略业务代码
                "set state".to_owned()
            });
            match &cmd as &str {
                "set state" => {
                    self.procman.set_state(0)
                },
                _ => ()
            }
        }
    }
}

async fn serve() {
    let mut procman = ProcessManager { state: 0 };
    let mut api = ControlService { procman: &mut procman };
    let fut0 = procman.serve();
    let fut1 = api.serve();
    join!(fut0, fut1);
}

fn main() {
    executor::block_on(serve());
}

显然上面的代码是不能通过borrow checker的,于是自然地,我将状态塞到RefCell中:

#![feature(async_await, await_macro, futures_api)]

use futures::{join};
use futures::executor::{self};
use std::cell::RefCell;

struct ProcessManager {
    state: RefCell<i32>
}

struct ControlService<'a> {
    procman: &'a ProcessManager
}

impl ProcessManager {
    async fn serve(&self) {
        loop {
            await!(async {
                // 省略具体业务代码
                *self.state.borrow_mut() += 1
            });
        }
    }

    fn set_state(&self, state: i32) {
        *self.state.borrow_mut() = state
    }
}

impl<'a> ControlService<'a> {
    async fn serve(&'a self) {
        loop {
            let cmd = await!(async {
                // 省略业务代码
                "set state".to_owned()
            });
            match &cmd as &str {
                "set state" => {
                    self.procman.set_state(0)
                },
                _ => ()
            }
        }
    }
}

async fn serve() {
    let procman = ProcessManager { state: RefCell::new(0) };
    let api = ControlService { procman: &procman };
    let fut0 = procman.serve();
    let fut1 = api.serve();
    join!(fut0, fut1);
}

fn main() {
    executor::block_on(serve());
}

这下能通过编译,勉强实现功能。但是这里使用RefCell并不是一个好的pattern。第一感觉有以下问题:

  • 随着代码越来越复杂RefCell会使得代码到处冗长,混乱
  • ProcessManager的所有方法必须规避&mut self, 使得所有的实际mutating API都丢失了mutating语义
  • async/await 中使用RefCell变量时,相比正常函数,很容易导致意外的的panic
impl ProcessManager {
    async fn serve(&self) {
        loop {
            // 这里保持了借用state,将导致其他协程里面访问set_state的时候产生panic
            // 相比正常函数,async函数中更容易不小心犯这种错误.
            let state = self.state.borrow();
            await!(do_something_with_state(&state));
            *self.state.borrow_mut() += 1
        }
    }
    

那么Rust里面怎样更优雅地实现基于async/await的有状态服务?

评论区

写评论
作者 Kevin Wang 2019-01-02 14:47

谢谢,这个写法并没有解决任何问题啊,我实际程序里面state原本就是一个大的struct,只是为了方便描述,才简化为一个i32类型。 state是i32还是struct都不影响遇到的问题呢。

@laizy 如果是我的话,我会把State和RefCell隔离开,RefCell是最外层的东西,把State独立为一个结构: struct AppState{ counter: i32, //... } impl AppState{
fn set_state(&mut self, state: i32) { self.counter= counter } }

这样如果我要在单线程里跑就在用的地方RefCell包一下,如果后面要改成多线程的就用Arc包装。

@Kevin Wang 确实存在这样的问题,API设计的时候也会考虑这些问题。 所以我的问题是Rust里面该怎样写呢?

@laizy 跨await借用的写法本身就容易导致bug,相当于是多个task之间没有加锁就可以并发修改状态。task1改写了一半状态,或者根据当前状态算了一个中间值,然后await,其他线程去读写状态就会读到task1写了一半的状态。然后await返回,task1继续执行,但是之前算的中间值可能由于其他task的改写已经过期了,需要重新计算。

laizy 2019-01-02 13:17

如果是我的话,我会把State和RefCell隔离开,RefCell是最外层的东西,把State独立为一个结构:

struct AppState{
    counter: i32,
    //...
}
impl AppState{  
    fn set_state(&mut self, state: i32) {
        self.counter= counter
    }
}

这样如果我要在单线程里跑就在用的地方RefCell包一下,如果后面要改成多线程的就用Arc<Mutex>包装。

@Kevin Wang 确实存在这样的问题,API设计的时候也会考虑这些问题。 所以我的问题是Rust里面该怎样写呢?

@laizy 跨await借用的写法本身就容易导致bug,相当于是多个task之间没有加锁就可以并发修改状态。task1改写了一半状态,或者根据当前状态算了一个中间值,然后await,其他线程去读写状态就会读到task1写了一半的状态。然后await返回,task1继续执行,但是之前算的中间值可能由于其他task的改写已经过期了,需要重新计算。

作者 Kevin Wang 2019-01-02 09:58

确实存在这样的问题,API设计的时候也会考虑这些问题。 所以我的问题是Rust里面该怎样写呢?

@laizy 跨await借用的写法本身就容易导致bug,相当于是多个task之间没有加锁就可以并发修改状态。task1改写了一半状态,或者根据当前状态算了一个中间值,然后await,其他线程去读写状态就会读到task1写了一半的状态。然后await返回,task1继续执行,但是之前算的中间值可能由于其他task的改写已经过期了,需要重新计算。

laizy 2019-01-01 23:26

跨await借用的写法本身就容易导致bug,相当于是多个task之间没有加锁就可以并发修改状态。task1改写了一半状态,或者根据当前状态算了一个中间值,然后await,其他线程去读写状态就会读到task1写了一半的状态。然后await返回,task1继续执行,但是之前算的中间值可能由于其他task的改写已经过期了,需要重新计算。

1 共 4 条评论, 1 页