< 返回版块

lithbitren 发表于 2020-10-28 15:41

Tags:异步,协程,async

Python 的 asyncio 里面可以通过asyncio.get_running_loop().run_in_executor()来让一些不占用cpu的普通异步事件变成不影响协程调度的异步事件,比如说一些多线程的sleep等待或其他进程的计算之类的,亦或者是老牌同步请求库requests库也可以直接转成协程,Rust怎么实现这种功能呢?

比如说类似这样的Python脚本怎么翻译成Rust(Python有GIL,才另起一个进程模拟服务器,Rust用线程也可以)

import multiprocessing
import threading
import asyncio
import time

n = 12
    
# 新建进程启动模拟服务器和启动协程客户端
def main():
    host, listener = multiprocessing.Pipe()
    multiprocessing.Process(target=server, args=(listener, )).start()
    time.sleep(1)
    asyncio.run(client(host))

# 客户端同时发出n个协程异步请求
async def client(host):
    t = time.time()
    print('start')
    loop = asyncio.get_running_loop()
    responses = await asyncio.gather(*(
        request(index, host, loop)
        for index in range(n)
    ))
    print(f'end: {time.time() - t: .3f}s')
    print(responses)
    
# 通过run_in_executor把多进程的异步事件转成协程异步事件
async def request(index, host, loop):
    t = time.time()
    print('client start', index)
    response = await loop.run_in_executor(None, connect, index, host)
    print('client end', index, f'{time.time() - t:.3f}s')
    return response
    
# 用多进程管道来模拟请求
def connect(index, host):
    sender, receiver = multiprocessing.Pipe()
    host.send((index, sender))
    return receiver.recv()

# 模拟服务器异步监听n个请求
def server(listener):
    print('open server')
    method = threading.Thread
    # method = multiprocessing.Process
    multi = []
    for _ in range(n):
        multi.append(method(target=api, args=listener.recv()))
        multi[-1].start()
    for m in multi:
        m.join()
    print('close server')

# 模拟服务器处理请求
def api(index, sender):
    print('server start', index)
    time.sleep(1)
    # for _ in range(10 ** 7):
    #     pass
    sender.send(f'response for {index}')
    print('server end', index)

if __name__ == '__main__':
    main()

评论区

写评论
93996817 2020-10-28 17:07

贴下使用 tokio的例子 供参考

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    let now = Instant::now();

    let mut futs = FuturesUnordered::new();
    futs.push(tokio::spawn(task("task1", now.clone())));  //任务1
    futs.push(tokio::spawn(task("task2", now.clone())));  //任务2
    futs.push(tokio::spawn(task("task3", now.clone())));  //任务3 
    //************其它任务 */
    while let Some(_handled) = futs.next().await {}
    Ok(())
}

async fn task(label: &str, now: std::time::Instant) -> Result<(), Box<dyn Error + Send + Sync>> {
    // Simulate network delay using Tokio async delay for 2 seconds
    println!(
        "OS Thread {:?} - {} started: {:?}",
        std::thread::current().id(),
        label,
        now.elapsed(),
    );
    tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await;

    // Write to server - server will echo this back to us with 8 second delay
    let mut stream = TcpStream::connect("127.0.0.1:6142").await?;
    stream.write_all(label.as_bytes()).await?;
    println!(
        "OS Thread {:?} - {} written: {:?}",
        std::thread::current().id(),
        label,
        now.elapsed()
    );

    // Read 5 chars we expect (to avoid dealing with EOF, etc.)
    let mut buffer = [0; 5];
    stream.read_exact(&mut buffer).await?;
    stream.shutdown(std::net::Shutdown::Both)?;
    println!(
        "OS Thread {:?} - {} read: {:?}",
        std::thread::current().id(),
        label,
        now.elapsed()
    );

    // Simulate computation work by sleeping actual thread for 4 seconds
    sleep(std::time::Duration::from_secs(4));
    println!(
        "OS Thread {:?} - {} finished: {:?}",
        std::thread::current().id(),
        std::str::from_utf8(&buffer)?,
        now.elapsed()
    );
    Ok(())
}
1 共 1 条评论, 1 页