< 返回版块

rascalrr 发表于 2023-11-30 14:01

Tags:tokio

遇到一个很奇怪的问题。 简化下需求:需要从大文件中读取数据,一次读取两行。

我写了两种解码方式:

  1. decode_two_onetime 每次检索两个换行符
  2. decode_two_stepbystep 每次检索一个换行符,第一次检索结果记录在decoder中。

第一种方式运行正常,可以顺利读取文件也符合需求。 第二种方式只能读取6行数据,然后就进入 decode_eof表示文件读到头了。

大家帮我看看哪里有问题?为什么第二种方式无法读完整个文件?

use std::time::Duration;

use anyhow::Result;
use tokio::fs::OpenOptions;
use tokio::io::BufReader;
use tokio_stream::StreamExt;
use tokio_util::bytes::BytesMut;
use tokio_util::codec::{Decoder, FramedRead};

#[tokio::main]
async fn main() -> Result<()> {
    let file = OpenOptions::new()
        .read(true)
        .open("e:/test_data.txt")
        .await?;
    let br = BufReader::new(file);
    let mut twoline_reader = FramedRead::new(br, TwoLineDecodec::default());

    loop {
        if let Some(two_line) = twoline_reader.next().await {
            match two_line {
                Ok(two_line) => {
                    println!("Line1:{}", two_line.line1);
                    println!("Line2:{}", two_line.line2);
                }
                Err(e) => println!("Error:{}", e),
            }
        } else {
            println!("Get None");            
            break;
        }
    }

    println!("读取完毕");
    Ok(())
}

pub struct TwoLine {
    pub line1: String,
    pub line2: String,
}

#[derive(Default)]
pub struct TwoLineDecodec {
    pub first_pos: Option<usize>,
}

impl TwoLineDecodec {
    pub fn reset(&mut self) {
        self.first_pos = None;
    }
}

impl Decoder for TwoLineDecodec {
    type Item = TwoLine;
    type Error = anyhow::Error;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        self.decode_two_onetime(src)
        // self.decode_two_stepbystep(src)
    }
    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        // 收到EOF,剩余的部分继续读取
        println!("File EOF");
        self.decode(buf)
    }
}

impl TwoLineDecodec {
    fn decode_two_onetime(&mut self, src: &mut BytesMut) -> Result<Option<TwoLine>, anyhow::Error> {
        if let Some(first) = src[..].iter().position(|b| *b == b'\n') {
            self.first_pos = Some(first);
            if let Some(second) = src[first + 1..].iter().position(|b| *b == b'\n') {
                println!("first:{} second:{}", first, second);
                let raw = src.split_to(first + second + 2);
                let twoline = std::str::from_utf8(&raw)?.trim();
                let (line1, line2) = twoline.split_at(first);

                let msg = TwoLine {
                    line1: line1.trim().to_owned(),
                    line2: line2.trim().to_owned(),
                };
                self.reset();
                return Ok(Some(msg));
            }
        }

        Ok(None)
    }
    fn decode_two_stepbystep(
        &mut self,
        src: &mut BytesMut,
    ) -> Result<Option<TwoLine>, anyhow::Error> {
        // println!("Data:{}", std::str::from_utf8(src)?);
        match self.first_pos {
            None => {
                self.first_pos = src[..].iter().position(|b| *b == b'\n');
            }
            Some(first) => {
                if let Some(second) = src[first + 1..].iter().position(|b| *b == b'\n') {
                    // println!("first:{} second:{}", first, second);
                    let raw = src.split_to(first + second + 2);
                    let twoline = std::str::from_utf8(&raw)?.trim();
                    let (line1, line2) = twoline.split_at(first);

                    let msg = TwoLine {
                        line1: line1.trim().to_owned(),
                        line2: line2.trim().to_owned(),
                    };
                    self.reset();
                    return Ok(Some(msg));
                }
            }
        }

        Ok(None)
    }
}

评论区

写评论
作者 rascalrr 2023-12-01 09:59

最后改好的版本,确认是有效的。

    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        loop {            
            if buf.is_empty() {
                break;
            }

            match self.decode(buf) {
                Ok(Some(result)) => {
                    return Ok(Some(result));
                }
                Ok(None) => {

                }
                Err(_) => {
                    break;
                }
            }
        }
        Ok(None)
    }

作者 rascalrr 2023-11-30 18:17

In these cases decode_eof will be called until it signals fulfillment of all closing frames by returning Ok(None). After that, repeated attempts to read from the Framed or FramedRead will not invoke decode or decode_eof again, until data can be read during a retry.

It is up to the Decoder to keep track of a restart after an EOF, and to decide how to handle such an event by, for example, allowing frames to cross EOF boundaries, re-emitting opening frames, or resetting the entire internal state.

大概理解了一下, 读socket,只要连接不断就不会进入 decode_eof,所以在decode中返回Ok(None)不会导致数据没读完就退出。 读文件只要读取完毕,则会主动进入decode_eof,我在decode_eof中调用了decode返回了None,导致未完全读取结束就退出了。

作者 rascalrr 2023-11-30 17:50

好像有点明白了,这和FramedRead读取socket数据不一样呀。读取Socket数据时,即使返回Ok(None),下一次依然可以继续读取数据。 那么如果进入decode_eof 的话,有没有办法让状态机恢复 Framing-Reading状态呢?

--
👇
苦瓜小仔: 对于 decode_two_stepbystep,第一次调用 decode 时,返回 Ok(None),轮询 Future 时,Framed 状态机 从 Framing-Reading 进入 Reading-Pausing,然后调用 decode_eof。

而对于 decode_two_onetime,每次调用 decode 总是返回 Ok(Some(...)),Framed 状态机处于 Framing-Reading,从而一直调用 decode;当 decode 返回 Ok(None) 后,轮询 Future 时,进入下一个状态去调用 decode_eof。

苦瓜小仔 2023-11-30 17:02

对于 decode_two_stepbystep,第一次调用 decode 时,返回 Ok(None),轮询 Future 时,Framed 状态机 从 Framing-Reading 进入 Reading-Pausing,然后调用 decode_eof。

而对于 decode_two_onetime,每次调用 decode 总是返回 Ok(Some(...)),Framed 状态机处于 Framing-Reading,从而一直调用 decode;当 decode 返回 Ok(None) 后,轮询 Future 时,进入下一个状态去调用 decode_eof。

bestgopher 2023-11-30 16:52

第二种获取first的时候返回Ok(None)了

作者 rascalrr 2023-11-30 16:01

因为这只是一个说明问题例子, lines当然可以,即便用例子里的decode_two_onetime也不是不行。 但我需要知道为什么decode_two_stepbystep这样写不行。

--
👇
PrivateRookie: 为什么要这么麻烦, 用 lines方法获取行的迭代器, 然后手动遍历下就好了吧, 用std的例子演示差不多是

use std::io::{BufRead, BufReader};
let fd = std::fs::File::open("demo").unwrap();
let r = BufReader::new(fd);
let mut lines = r.lines();
match (lines.next(), lines.next()) {
    (None, None) => {}
    (None, Some(_)) => {
             unreachable!()
    }
    (Some(l1), None) => {
        println!("最后奇数行 {}", l1.unwrap());
     }
     (Some(l1), Some(l2)) => {
          println!("pair l1 {} l2 {}", l1.unwrap(), l2.unwrap());
     }
}
PrivateRookie 2023-11-30 15:46

为什么要这么麻烦, 用 lines方法获取行的迭代器, 然后手动遍历下就好了吧, 用std的例子演示差不多是

use std::io::{BufRead, BufReader};
let fd = std::fs::File::open("demo").unwrap();
let r = BufReader::new(fd);
let mut lines = r.lines();
match (lines.next(), lines.next()) {
    (None, None) => {}
    (None, Some(_)) => {
             unreachable!()
    }
    (Some(l1), None) => {
        println!("最后奇数行 {}", l1.unwrap());
     }
     (Some(l1), Some(l2)) => {
          println!("pair l1 {} l2 {}", l1.unwrap(), l2.unwrap());
     }
}
1 共 7 条评论, 1 页