请教一个问题:
在我们的libp2p-rs开源项目中,定义了ReadEx/WriteEx 取代 AsyncRead/AsyncWrite来简化 I/O 操作。也即使用async/await 来避免书写 poll_xxx。然而在实现一个类似于 AsyncReadExt 中的 split 方法时,遇到了困难。参考AsyncReadExt::split(),我们也用BiLock来将T 分离为 ReadHalf/WriteHalf, 然后为 ReadHalf 实现AsyncRead::poll_read。因为 T::read2() 是一个 async GenFuture,所以需要当返回 Pending 时将其装箱记下来,待再次被唤醒后继续轮询。这样的思路写出代码示例如下,但无法通过编译:
use std::io;
use futures::io::Cursor;
use futures::prelude::*;
use futures::lock::BiLock;
use futures::{AsyncReadExt, AsyncWriteExt};
use async_trait::async_trait;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Read Trait for async/await
///
#[async_trait]
pub trait ReadEx: Send {
async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error>;
}
/// Write Trait for async/await
///
#[async_trait]
pub trait WriteEx: Send {
async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error>;
}
#[async_trait]
impl<T: AsyncRead + Unpin + Send> ReadEx for T {
async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let n = AsyncReadExt::read(self, buf).await?;
Ok(n)
}
}
#[async_trait]
impl<T: AsyncWrite + Unpin + Send> WriteEx for T {
async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
AsyncWriteExt::write(self, buf).await
}
}
// Problematic: how to mark ReadHalf with proper lifetime??
pub struct ReadHalf<T> {
handle: BiLock<T>,
fut: Option<Pin<Box<dyn Future<Output=io::Result<usize>> + Send + Unpin + 'static>>>,
}
impl<T> ReadHalf<T> {
pub fn new(lock: BiLock<T>) -> Self {
ReadHalf {
handle: lock,
fut: None,
}
}
}
impl<T: ReadEx + Unpin> AsyncRead for ReadHalf<T> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let mut lock = futures::ready!(self.handle.poll_lock(cx));
let t = &mut *lock;
if self.fut.is_none() {
self.fut = Some(Box::pin(t.read2(buf)));
}
let mut fut = self.fut.take().expect("must not be none");
match fut.as_mut().poll(cx) {
Poll::Pending => {
self.fut = Some(fut);
Poll::Pending
},
Poll::Ready(ret) => {
Poll::Ready(ret)
}
}
}
}
pub trait ReadExt2: ReadEx {
fn split2(self) -> (ReadHalf<Self>, WriteHalf<Self>)
where
Self: Sized + WriteEx + Unpin,
{
let (a, b) = BiLock::new(self);
(ReadHalf { handle: a, fut: None }, WriteHalf { handle: b })
}
}
impl<R: ReadEx + ?Sized> ReadExt2 for R {}
/// The writable half of an object returned from `AsyncRead::split`.
pub struct WriteHalf<T> {
handle: BiLock<T>,
}
impl<W: WriteEx + Unpin> AsyncWrite for WriteHalf<W> {
fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &[u8]) -> Poll<io::Result<usize>> {
unimplemented!()
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unimplemented!()
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unimplemented!()
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct Test(Cursor<Vec<u8>>);
#[async_trait]
impl ReadEx for Test {
async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
self.0.read(buf).await
}
}
#[async_trait]
impl WriteEx for Test {
async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.0.write(buf).await
}
}
fn main() {
futures::executor::block_on(async {
let rw = Test(Cursor::new(vec![1, 2, 3]));
let (mut reader, _writer) = rw.split2();
let mut output = [0u8; 3];
let bytes = reader.read2(&mut output[..]).await.unwrap();
assert_eq!(bytes, 3);
assert_eq!(output, [1, 2, 3]);
});
}
错误提示如下:
error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
--> src\main.rs:59:40
|
59 | let mut lock = futures::ready!(self.handle.poll_lock(cx));
| ^^^^^^^^^^^
|
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 58:5...
--> src\main.rs:58:5
|
58 | / fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
59 | | let mut lock = futures::ready!(self.handle.poll_lock(cx));
60 | | let t = &mut *lock;
61 | | if self.fut.is_none() {
... |
73 | | }
74 | | }
| |_____^
note: ...so that the reference type `&std::pin::Pin<&mut ReadHalf<T>>` does not outlive the data it points at
--> src\main.rs:59:40
|
59 | let mut lock = futures::ready!(self.handle.poll_lock(cx));
| ^^^^^^^^^^^
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
--> src\main.rs:62:24
|
62 | self.fut = Some(Box::pin(t.read2(buf)));
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
= note: expected `std::option::Option<std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send + std::marker::Unpin + 'static)>>>`
found `std::option::Option<std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send + std::marker::Unpin>>>`
个人理解,62行 t.read2(buf) 引入 'static,与self 本身推导lifetime有冲突。于是尝试在ReadHalf.fut引入lifetime的标识,将'static修改为 'a,一番折腾还是有问题。不过,总是觉得此问题应该有解,因为这里事实上lifetime都是 self自身,但一直未能摸索出正确写法。
故发帖求助,望各位大咖不吝赐教。另外,尝试编译上述示例代码,需要在cargo.toml 指定 :
[dependencies] async-trait = "0.1" futures = { version = "0.3", features = ["default", "bilock", "unstable"] }
Ext Link: https://github.com/kingwel-xie/libp2prs_split_problem
评论区
写评论还没有评论