首先,我想在公司内推广使用rust。公司产品属于类似家庭路由器一样cpu/内存相对宝贵的linux设备,里面的服务程序都是C语言写的、有状态的服务。我想尝试用rust逐步替代C,并且尽量避免多线程以节省资源,于是自然相中了async/await使用协程实现。
以进程管理的服务为例
进程管理服务我设想是设计为单线程,为方便描述,我简化为两部分:
- 进程管理业务模块(实现进程控制状态管理的具体功能)
- 控制模块(提供API给外部进程使用,启停进程,查看状态等)
用Python代码简单表示如下
import asyncio
class App:
def __init__(self):
self.procman = ProcessManager()
self.api = ControlService(self.procman)
async def serve(self):
await asyncio.wait([
self.procman.serve(),
self.api.serve(),
])
class ControlService:
def __init__(self, procman):
self.procman = procman
async def serve(self):
while True:
print('[ControlService ] Waiting for user cmd...')
cmd, params = await next_user_request()
print("[ControlService ] Received user cmd: {}".format(cmd))
if cmd == "set state":
_ok = self.procman.set_state(params)
# ...
elif cmd == "get state":
_state = self.procman.get_state()
# ...
else:
pass
class ProcessManager:
def __init__(self):
# 为便于描述理解,我将内部状态简化为一个state属性代表
self._state = 0
def set_state(self, v):
# 这里set_state代表一个提供给外部调用的API, 会访问到内部状态
self._state = 0
return True
def get_state(self):
return self._state
async def serve(self):
# PM本身使用一个协程来实现内部功能,运行过程中也会改变内部state
while True:
self._state += 1
print("self._state = {}".format(self._state))
await asyncio.sleep(1)
async def next_user_request():
await asyncio.sleep(5)
return "set state", 0
if __name__ == '__main__':
app = App()
loop = asyncio.get_event_loop()
loop.run_until_complete(app.serve())
loop.close()
改用Rust实现,首先直接转换过来v1如下:
#![feature(async_await, await_macro, futures_api, pin)]
use futures::{join};
use futures::executor::{self};
struct ProcessManager {
state: i32
}
struct ControlService<'a> {
procman: &'a mut ProcessManager
}
impl ProcessManager {
async fn serve(&mut self) {
loop {
await!(async {
// 这里省略业务代码
self.state += 1
});
}
}
fn set_state(&mut self, state: i32) {
self.state = state
}
}
impl<'a> ControlService<'a> {
async fn serve(&'a mut self) {
loop {
let cmd = await!(async {
// 这里省略业务代码
"set state".to_owned()
});
match &cmd as &str {
"set state" => {
self.procman.set_state(0)
},
_ => ()
}
}
}
}
async fn serve() {
let mut procman = ProcessManager { state: 0 };
let mut api = ControlService { procman: &mut procman };
let fut0 = procman.serve();
let fut1 = api.serve();
join!(fut0, fut1);
}
fn main() {
executor::block_on(serve());
}
显然上面的代码是不能通过borrow checker的,于是自然地,我将状态塞到RefCell中:
#![feature(async_await, await_macro, futures_api)]
use futures::{join};
use futures::executor::{self};
use std::cell::RefCell;
struct ProcessManager {
state: RefCell<i32>
}
struct ControlService<'a> {
procman: &'a ProcessManager
}
impl ProcessManager {
async fn serve(&self) {
loop {
await!(async {
// 省略具体业务代码
*self.state.borrow_mut() += 1
});
}
}
fn set_state(&self, state: i32) {
*self.state.borrow_mut() = state
}
}
impl<'a> ControlService<'a> {
async fn serve(&'a self) {
loop {
let cmd = await!(async {
// 省略业务代码
"set state".to_owned()
});
match &cmd as &str {
"set state" => {
self.procman.set_state(0)
},
_ => ()
}
}
}
}
async fn serve() {
let procman = ProcessManager { state: RefCell::new(0) };
let api = ControlService { procman: &procman };
let fut0 = procman.serve();
let fut1 = api.serve();
join!(fut0, fut1);
}
fn main() {
executor::block_on(serve());
}
这下能通过编译,勉强实现功能。但是这里使用RefCell并不是一个好的pattern。第一感觉有以下问题:
- 随着代码越来越复杂RefCell会使得代码到处冗长,混乱
- ProcessManager的所有方法必须规避&mut self, 使得所有的实际mutating API都丢失了mutating语义
- async/await 中使用RefCell变量时,相比正常函数,很容易导致意外的的panic
impl ProcessManager {
async fn serve(&self) {
loop {
// 这里保持了借用state,将导致其他协程里面访问set_state的时候产生panic
// 相比正常函数,async函数中更容易不小心犯这种错误.
let state = self.state.borrow();
await!(do_something_with_state(&state));
*self.state.borrow_mut() += 1
}
}
那么Rust里面怎样更优雅地实现基于async/await的有状态服务?
1
共 4 条评论, 1 页
评论区
写评论谢谢,这个写法并没有解决任何问题啊,我实际程序里面state原本就是一个大的struct,只是为了方便描述,才简化为一个i32类型。 state是i32还是struct都不影响遇到的问题呢。
如果是我的话,我会把State和RefCell隔离开,RefCell是最外层的东西,把State独立为一个结构:
这样如果我要在单线程里跑就在用的地方RefCell包一下,如果后面要改成多线程的就用Arc<Mutex>包装。
确实存在这样的问题,API设计的时候也会考虑这些问题。 所以我的问题是Rust里面该怎样写呢?
跨await借用的写法本身就容易导致bug,相当于是多个task之间没有加锁就可以并发修改状态。task1改写了一半状态,或者根据当前状态算了一个中间值,然后await,其他线程去读写状态就会读到task1写了一半的状态。然后await返回,task1继续执行,但是之前算的中间值可能由于其他task的改写已经过期了,需要重新计算。