< 返回版块

槟橙炮炮 发表于 2021-02-13 02:41

Tags:async,task,科学上网

缘起: delay-timer v0.3.0 release,想去reddit上发布一下,但是遇到了梯子不可用的情况。 咨询客服无果(人倒是很好),因为平时用openvn上梯子,遂把他们的 ovn 的一万台服务器配置下载下来,自己筛选可连用配置。

顺手用tokio写了一个工具, 筛选可用配置。

里面有一些,自定义Future状态机设计的思路,跟大家分享一下。

首先拆分需求: 高速的在成千上万的配置中找到可用的那部分, 也就是说能够跟配置上声明的多个服务器建立通信。

因为多是IO请求,借async之力发送并发请求,采用:tokio

需要一个命令行入口,供用户输入并让输出自定义的信息,为了满足这部分采用:structopt

在运行过程中,为了让用户有更好的体验,透明化当前的进度,采用:progress_bar

效果图

coding:

Udp-check部分建立通信,尝试与服务端读写, 根据实际情况返回Result。

pub(crate) async fn verify_addres_by_udp(address: impl ToSocketAddrs + Debug) -> Result<usize> {
    // change to 0.0.0.0:0 is good.
    // bind() the socket before you send your data. Specify port 0 to bind(),
    // and the OS will pick an unused port for you.

    let sock = UdpSocket::bind("0.0.0.0:0".parse::<SocketAddr>().unwrap()).await?;
    sock.set_ttl(15)?;

    sock.connect(address).await.unwrap_or(());

    sock.send(b"hello world").await?;
    let mut buf = vec![0; 10];
    timeout(Duration::from_secs(15), sock.recv(&mut buf)).await?
}

Tcp部分建立通信,尝试对服务端写, 根据实际情况返回Result。

pub(crate) async fn verify_addres_by_tcp(address: String) -> Result<()> {
    let sock = TcpSocket::new_v4()?;

    let stream = sock.connect(address.parse().unwrap()).await?;

    timeout(Duration::from_secs(15), stream.writable()).await?
}

切分ovpn配置文件,获取 IP 端口 协议等元数据

pub(crate) async fn verify_one_conf(
    file_name: OsString,
    _concurrency_limit: ConcurrencyLimits,
) -> Option<OsString> {
    let f = File::open(file_name.clone()).await.unwrap();

    let buf_reader = BufReader::new(f);
    let mut lines = buf_reader.lines();
    let mut state: i32 = 0;
    let mut through_file_name: Option<OsString> = None;
    let mut proto = Proto::Tcp;
    let mut address = String::new();

    while let Some(line) = lines.next_line().await.unwrap_or_else(|e| {
        dbg!(e);
        None
    }) {
        if line.contains("proto") {
            state += 1;
            if line.trim_start_matches("proto").trim() == "udp" {
                proto = Proto::Udp;
            }
        }

        if line.contains("remote") {
            state += 1;
            let remote = line.trim_start_matches("remote").trim();
            //split
            address = remote.splitn(2, " ").collect::<Vec<&str>>().join(":");
        }

        if state == 2 {
            match proto {
                Proto::Udp => {
                    if verify_addres_by_udp(address).await.is_ok() {
                        through_file_name = Some(file_name);
                    }
                }

                Proto::Tcp => {
                    if verify_addres_by_tcp(address).await.is_ok() {
                        through_file_name = Some(file_name);
                    }
                }
            }

            return through_file_name;
        }
    }

    through_file_name
}

回到main.rs,定义命令行输入输出的定义


/// Structual of verify-ovpn.
#[derive(StructOpt, Debug)]
#[structopt(name = "verify-ovpn")]
struct Opt {
    /// Input path (Absolute path)
    #[structopt(short, long, parse(from_os_str))]
    input: PathBuf,

    /// Output path (Absolute path)
    #[structopt(short, long, parse(from_os_str))]
    output: PathBuf,
}

main的实际跑的是一个 Future, 它控制全局的任务生产消费。


fn main() -> Result<()> {
    let opt: Opt = Opt::from_args();
    // Changing the program's working directory eliminates the need to specify an absolute path in front of each file.
    set_current_dir(opt.input.clone())?;

    let tr = Runtime::new()?;

    // when async block can't infer returen-type change it to async-fn so you can explict return-type.
    tr.block_on(verify_ovpns(opt.input, opt.output))
}

预备基本的数据,准备执行实际的任务分配。

async fn verify_ovpns(input: PathBuf, output: PathBuf) -> Result<()> {
    // prepare data-source.
    let mut entry_buf = Vec::<DirEntry>::with_capacity(2048);
    let output_file = File::create(output).await?;

    let mut entries = read_dir(input).await?;

    while let Some(entry) = entries.next_entry().await? {
        entry_buf.push(entry);
    }

    // set Process bar.
    let mut progress_bar = ProgressBar::new(entry_buf.len() * 2);
    progress_bar.set_action("Loading", Color::Blue, Style::Bold);
    progress_bar.print_info("Hint", "The default is to check 1000 ovpn at a time concurrently.", Color::Green, Style::Bold);
   
    do_verify(entry_buf, output_file, progress_bar).await
}


实际的任务分配,与progress_bar的实时更新。 因为操作系统限制了打开句柄的打开数量(默认是1000), 我这里自己封装了 一个异步的Future-like数据结构,控制任务的启动数量。 名为:ConcurrencyLimits (下面会写如何设计)。


async fn do_verify(
    entry_buf: Vec<DirEntry>,
    mut output_file: File,
    mut progress_bar: ProgressBar,
) -> Result<()> {
    let process_num;
    let mut success_num = 0;
    let concurrency_limit = ConcurrencyLimits::default();
    let mut join_handle_vec = Vec::new();

    for file_entry in entry_buf.into_iter() {
        progress_bar.inc();
        concurrency_limit.clone().await;

        let file_name = file_entry.file_name();
        join_handle_vec.push(spawn(verify_one_conf(file_name, concurrency_limit.clone())));
    }

    process_num = join_handle_vec.len();

    for j in join_handle_vec.into_iter() {
        progress_bar.inc();
        if let Ok(o_s) = j.await {
            if let Some(s) = o_s {
                if let Some(mut file_name) = s.into_string().ok() {
                    file_name += LINE_ENDING;

                    success_num += 1;
                    output_file.write(dbg!(file_name).as_bytes()).await?;
                }
            }
        }
    }

    output_file.sync_all().await?;

    let done_text = format!(
        "Total {}, successfully connected {}",
        process_num, success_num
    );

    progress_bar.print_final_info("Done", &done_text, Color::LightGreen, Style::Bold);

    Ok(())
}

ConcurrencyLimits 设计思路:

对于并发任务数量的并发,我们可以从入口出发做控制。

需要我们封装状态机:

通过Future 异步的抽象,构造一个异步工作的数据结构。

关联的状态有:

当前的并发数 暂存的上下文,唤醒上游继续生产新任务

ps:cx(Context)

第一步想到的是用,(arc: (计数状态:Atomic) + (cx原语: buf)) 结构处理 因为,需要同时保证多个线程之间的计数同步,与状态共享。

后反思到, arc自身就带有引用计数, 多来一个atomic似乎显得多余,于是去除掉。 Arc<Buf>.

共享后cx后, 需要任务完成时自动唤醒上游,继续spawn任务,为了代码的简洁精炼把这部分放到了 Drop中 。 这样我们只需要在生产任务时,追加一个 concurrency_limit 结构就会自动控制任务数量。

main-future 上 执行concurrency_limit.await 就可以根据当前的任务量自动控制 暂停/继续了。

同时可以确定Buf的类型,采用Mutex, 因为可能发生多个线程同时操作cx的情况。


#[derive(Debug, Clone, Default)]
pub(crate) struct ConcurrencyLimits {
    inner: Arc<Mutex<Option<Waker>>>,
}

impl Future for ConcurrencyLimits {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let arc_mutext = &self.as_ref().get_ref().inner;

        let mut p = Poll::Ready(());
        if Arc::strong_count(arc_mutext) > 999 {
            p = Poll::Pending;

            let mut guard = arc_mutext.lock().unwrap();
            *guard = Some(cx.waker().clone());
        }
        p
    }
}

impl Drop for ConcurrencyLimits {
    fn drop(&mut self) {
        let arc_mutext = &self.inner;

        let mut guard = arc_mutext.lock().unwrap();
        if let Some(wk) = guard.take() {
            wk.wake();
        }
    }
}

repo: verify-ov*n

repo-raw-url: https://github.com/BinChengZhao/verify-ovpn

其实还有很多优化空间,各位可以随时反馈 :) . 完。

评论区

写评论
作者 槟橙炮炮 2021-02-13 08:34

嗯,谢谢提醒哈, 这里只是设计一个检验配置的工具,咱不随意用v*n,响应国家号召!

--
👇
E834159672: 据说,在中国国内,擅自使用未经许可的VPN联网,是违法行为,而哪些VPN是被许可的,有没有很即时的验证方法,所以使用未知人员搭建的VPN≈违法。

相关规定:

《中华人民共和国计算机信息网络国际联网暂行规定》

第六条和第十四条的规定:

计算机信息网络直接进行国际联网,必须使用邮电部国家公用电信网络提供的国际出入口信道,任何单位和个人都不得使用其他信道进行国际联网。

违反者由公安机关责令停止联网,给予警告,可以并处15000元以下的罚款。

E834159672 2021-02-13 08:27

据说,在中国国内,擅自使用未经许可的VPN联网,是违法行为,而哪些VPN是被许可的,有没有很即时的验证方法,所以使用未知人员搭建的VPN≈违法。

相关规定:

《中华人民共和国计算机信息网络国际联网暂行规定》

第六条和第十四条的规定:

计算机信息网络直接进行国际联网,必须使用邮电部国家公用电信网络提供的国际出入口信道,任何单位和个人都不得使用其他信道进行国际联网。

违反者由公安机关责令停止联网,给予警告,可以并处15000元以下的罚款。

1 共 2 条评论, 1 页