< 返回版块

kingwel-xie 发表于 2020-10-29 11:31

Tags:async, lifetime, i/o

请教一个问题:

在我们的libp2p-rs开源项目中,定义了ReadEx/WriteEx 取代 AsyncRead/AsyncWrite来简化 I/O 操作。也即使用async/await 来避免书写 poll_xxx。然而在实现一个类似于 AsyncReadExt 中的 split 方法时,遇到了困难。参考AsyncReadExt::split(),我们也用BiLock来将T 分离为 ReadHalf/WriteHalf, 然后为 ReadHalf 实现AsyncRead::poll_read。因为 T::read2() 是一个 async GenFuture,所以需要当返回 Pending 时将其装箱记下来,待再次被唤醒后继续轮询。这样的思路写出代码示例如下,但无法通过编译:

use std::io;

use futures::io::Cursor;
use futures::prelude::*;
use futures::lock::BiLock;
use futures::{AsyncReadExt, AsyncWriteExt};

use async_trait::async_trait;
use std::pin::Pin;
use std::task::{Context, Poll};


/// Read Trait for async/await
///
#[async_trait]
pub trait ReadEx: Send {
    async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error>;
}

/// Write Trait for async/await
///
#[async_trait]
pub trait WriteEx: Send {
    async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error>;
}

#[async_trait]
impl<T: AsyncRead + Unpin + Send> ReadEx for T {
    async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
        let n = AsyncReadExt::read(self, buf).await?;
        Ok(n)
    }
}

#[async_trait]
impl<T: AsyncWrite + Unpin + Send> WriteEx for T {
    async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
        AsyncWriteExt::write(self, buf).await
    }
}

// Problematic: how to mark ReadHalf with proper lifetime??
pub struct ReadHalf<T> {
    handle: BiLock<T>,
    fut: Option<Pin<Box<dyn Future<Output=io::Result<usize>> + Send + Unpin + 'static>>>,
}

impl<T> ReadHalf<T> {
    pub fn new(lock: BiLock<T>) -> Self {
        ReadHalf {
            handle: lock,
            fut: None,
        }
    }
}

impl<T: ReadEx + Unpin> AsyncRead for ReadHalf<T> {
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
        let mut lock = futures::ready!(self.handle.poll_lock(cx));
        let t = &mut *lock;
        if self.fut.is_none() {
            self.fut = Some(Box::pin(t.read2(buf)));
        }
        let mut fut = self.fut.take().expect("must not be none");
        match fut.as_mut().poll(cx) {
            Poll::Pending => {
                self.fut = Some(fut);
                Poll::Pending
            },
            Poll::Ready(ret) => {
                Poll::Ready(ret)
            }
        }
    }
}


pub trait ReadExt2: ReadEx {
    fn split2(self) -> (ReadHalf<Self>, WriteHalf<Self>)
        where
            Self: Sized + WriteEx + Unpin,
    {
        let (a, b) = BiLock::new(self);
        (ReadHalf { handle: a, fut: None }, WriteHalf { handle: b })
    }
}

impl<R: ReadEx + ?Sized> ReadExt2 for R {}

/// The writable half of an object returned from `AsyncRead::split`.
pub struct WriteHalf<T> {
    handle: BiLock<T>,
}

impl<W: WriteEx + Unpin> AsyncWrite for WriteHalf<W> {
    fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &[u8]) -> Poll<io::Result<usize>> {
        unimplemented!()
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        unimplemented!()
    }

    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        unimplemented!()
    }
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct Test(Cursor<Vec<u8>>);

#[async_trait]
impl ReadEx for Test {
    async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
        self.0.read(buf).await
    }
}

#[async_trait]
impl WriteEx for Test {
    async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
        self.0.write(buf).await
    }
}


fn main() {
    futures::executor::block_on(async {
        let rw = Test(Cursor::new(vec![1, 2, 3]));
        let (mut reader, _writer) = rw.split2();
        let mut output = [0u8; 3];
        let bytes = reader.read2(&mut output[..]).await.unwrap();

        assert_eq!(bytes, 3);
        assert_eq!(output, [1, 2, 3]);
    });
}

错误提示如下:

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
  --> src\main.rs:59:40
   |
59 |         let mut lock = futures::ready!(self.handle.poll_lock(cx));
   |                                        ^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 58:5...
  --> src\main.rs:58:5
   |
58 | /     fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
59 | |         let mut lock = futures::ready!(self.handle.poll_lock(cx));
60 | |         let t = &mut *lock;
61 | |         if self.fut.is_none() {
...  |
73 | |         }
74 | |     }
   | |_____^
note: ...so that the reference type `&std::pin::Pin<&mut ReadHalf<T>>` does not outlive the data it points at
  --> src\main.rs:59:40
   |
59 |         let mut lock = futures::ready!(self.handle.poll_lock(cx));
   |                                        ^^^^^^^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
  --> src\main.rs:62:24
   |
62 |             self.fut = Some(Box::pin(t.read2(buf)));
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   = note: expected  `std::option::Option<std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send + std::marker::Unpin + 'static)>>>`
              found  `std::option::Option<std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send + std::marker::Unpin>>>`

个人理解,62行 t.read2(buf) 引入 'static,与self 本身推导lifetime有冲突。于是尝试在ReadHalf.fut引入lifetime的标识,将'static修改为 'a,一番折腾还是有问题。不过,总是觉得此问题应该有解,因为这里事实上lifetime都是 self自身,但一直未能摸索出正确写法。

故发帖求助,望各位大咖不吝赐教。另外,尝试编译上述示例代码,需要在cargo.toml 指定 :

[dependencies] async-trait = "0.1" futures = { version = "0.3", features = ["default", "bilock", "unstable"] }


Ext Link: https://github.com/kingwel-xie/libp2prs_split_problem

评论区

写评论

还没有评论

1 共 0 条评论, 1 页