缘起: delay-timer v0.3.0 release,想去reddit上发布一下,但是遇到了梯子不可用的情况。 咨询客服无果(人倒是很好),因为平时用openvn上梯子,遂把他们的 ovn 的一万台服务器配置下载下来,自己筛选可连用配置。
顺手用tokio写了一个工具, 筛选可用配置。
里面有一些,自定义Future状态机设计的思路,跟大家分享一下。
首先拆分需求: 高速的在成千上万的配置中找到可用的那部分, 也就是说能够跟配置上声明的多个服务器建立通信。
因为多是IO请求,借async之力发送并发请求,采用:tokio
需要一个命令行入口,供用户输入并让输出自定义的信息,为了满足这部分采用:structopt
在运行过程中,为了让用户有更好的体验,透明化当前的进度,采用:progress_bar

coding:
Udp-check部分建立通信,尝试与服务端读写, 根据实际情况返回Result。
pub(crate) async fn verify_addres_by_udp(address: impl ToSocketAddrs + Debug) -> Result<usize> {
    // change to 0.0.0.0:0 is good.
    // bind() the socket before you send your data. Specify port 0 to bind(),
    // and the OS will pick an unused port for you.
    let sock = UdpSocket::bind("0.0.0.0:0".parse::<SocketAddr>().unwrap()).await?;
    sock.set_ttl(15)?;
    sock.connect(address).await.unwrap_or(());
    sock.send(b"hello world").await?;
    let mut buf = vec![0; 10];
    timeout(Duration::from_secs(15), sock.recv(&mut buf)).await?
}
Tcp部分建立通信,尝试对服务端写, 根据实际情况返回Result。
pub(crate) async fn verify_addres_by_tcp(address: String) -> Result<()> {
    let sock = TcpSocket::new_v4()?;
    let stream = sock.connect(address.parse().unwrap()).await?;
    timeout(Duration::from_secs(15), stream.writable()).await?
}
切分ovpn配置文件,获取 IP 端口 协议等元数据
pub(crate) async fn verify_one_conf(
    file_name: OsString,
    _concurrency_limit: ConcurrencyLimits,
) -> Option<OsString> {
    let f = File::open(file_name.clone()).await.unwrap();
    let buf_reader = BufReader::new(f);
    let mut lines = buf_reader.lines();
    let mut state: i32 = 0;
    let mut through_file_name: Option<OsString> = None;
    let mut proto = Proto::Tcp;
    let mut address = String::new();
    while let Some(line) = lines.next_line().await.unwrap_or_else(|e| {
        dbg!(e);
        None
    }) {
        if line.contains("proto") {
            state += 1;
            if line.trim_start_matches("proto").trim() == "udp" {
                proto = Proto::Udp;
            }
        }
        if line.contains("remote") {
            state += 1;
            let remote = line.trim_start_matches("remote").trim();
            //split
            address = remote.splitn(2, " ").collect::<Vec<&str>>().join(":");
        }
        if state == 2 {
            match proto {
                Proto::Udp => {
                    if verify_addres_by_udp(address).await.is_ok() {
                        through_file_name = Some(file_name);
                    }
                }
                Proto::Tcp => {
                    if verify_addres_by_tcp(address).await.is_ok() {
                        through_file_name = Some(file_name);
                    }
                }
            }
            return through_file_name;
        }
    }
    through_file_name
}
回到main.rs,定义命令行输入输出的定义
/// Structual of verify-ovpn.
#[derive(StructOpt, Debug)]
#[structopt(name = "verify-ovpn")]
struct Opt {
    /// Input path (Absolute path)
    #[structopt(short, long, parse(from_os_str))]
    input: PathBuf,
    /// Output path (Absolute path)
    #[structopt(short, long, parse(from_os_str))]
    output: PathBuf,
}
main的实际跑的是一个 Future, 它控制全局的任务生产消费。
fn main() -> Result<()> {
    let opt: Opt = Opt::from_args();
    // Changing the program's working directory eliminates the need to specify an absolute path in front of each file.
    set_current_dir(opt.input.clone())?;
    let tr = Runtime::new()?;
    // when async block can't infer returen-type change it to async-fn so you can explict return-type.
    tr.block_on(verify_ovpns(opt.input, opt.output))
}
预备基本的数据,准备执行实际的任务分配。
async fn verify_ovpns(input: PathBuf, output: PathBuf) -> Result<()> {
    // prepare data-source.
    let mut entry_buf = Vec::<DirEntry>::with_capacity(2048);
    let output_file = File::create(output).await?;
    let mut entries = read_dir(input).await?;
    while let Some(entry) = entries.next_entry().await? {
        entry_buf.push(entry);
    }
    // set Process bar.
    let mut progress_bar = ProgressBar::new(entry_buf.len() * 2);
    progress_bar.set_action("Loading", Color::Blue, Style::Bold);
    progress_bar.print_info("Hint", "The default is to check 1000 ovpn at a time concurrently.", Color::Green, Style::Bold);
   
    do_verify(entry_buf, output_file, progress_bar).await
}
实际的任务分配,与progress_bar的实时更新。
因为操作系统限制了打开句柄的打开数量(默认是1000), 我这里自己封装了 一个异步的Future-like数据结构,控制任务的启动数量。
名为:ConcurrencyLimits (下面会写如何设计)。
async fn do_verify(
    entry_buf: Vec<DirEntry>,
    mut output_file: File,
    mut progress_bar: ProgressBar,
) -> Result<()> {
    let process_num;
    let mut success_num = 0;
    let concurrency_limit = ConcurrencyLimits::default();
    let mut join_handle_vec = Vec::new();
    for file_entry in entry_buf.into_iter() {
        progress_bar.inc();
        concurrency_limit.clone().await;
        let file_name = file_entry.file_name();
        join_handle_vec.push(spawn(verify_one_conf(file_name, concurrency_limit.clone())));
    }
    process_num = join_handle_vec.len();
    for j in join_handle_vec.into_iter() {
        progress_bar.inc();
        if let Ok(o_s) = j.await {
            if let Some(s) = o_s {
                if let Some(mut file_name) = s.into_string().ok() {
                    file_name += LINE_ENDING;
                    success_num += 1;
                    output_file.write(dbg!(file_name).as_bytes()).await?;
                }
            }
        }
    }
    output_file.sync_all().await?;
    let done_text = format!(
        "Total {}, successfully connected {}",
        process_num, success_num
    );
    progress_bar.print_final_info("Done", &done_text, Color::LightGreen, Style::Bold);
    Ok(())
}
ConcurrencyLimits 设计思路:
对于并发任务数量的并发,我们可以从入口出发做控制。
需要我们封装状态机:
通过Future 异步的抽象,构造一个异步工作的数据结构。
关联的状态有:
当前的并发数 暂存的上下文,唤醒上游继续生产新任务
ps:cx(Context)
第一步想到的是用,(arc: (计数状态:Atomic) + (cx原语: buf)) 结构处理 因为,需要同时保证多个线程之间的计数同步,与状态共享。
后反思到, arc自身就带有引用计数, 多来一个atomic似乎显得多余,于是去除掉。 Arc<Buf>.
共享后cx后, 需要任务完成时自动唤醒上游,继续spawn任务,为了代码的简洁精炼把这部分放到了 Drop中 。
这样我们只需要在生产任务时,追加一个 concurrency_limit 结构就会自动控制任务数量。
main-future 上 执行concurrency_limit.await 就可以根据当前的任务量自动控制 暂停/继续了。
同时可以确定Buf的类型,采用Mutex, 因为可能发生多个线程同时操作cx的情况。
#[derive(Debug, Clone, Default)]
pub(crate) struct ConcurrencyLimits {
    inner: Arc<Mutex<Option<Waker>>>,
}
impl Future for ConcurrencyLimits {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let arc_mutext = &self.as_ref().get_ref().inner;
        let mut p = Poll::Ready(());
        if Arc::strong_count(arc_mutext) > 999 {
            p = Poll::Pending;
            let mut guard = arc_mutext.lock().unwrap();
            *guard = Some(cx.waker().clone());
        }
        p
    }
}
impl Drop for ConcurrencyLimits {
    fn drop(&mut self) {
        let arc_mutext = &self.inner;
        let mut guard = arc_mutext.lock().unwrap();
        if let Some(wk) = guard.take() {
            wk.wake();
        }
    }
}
repo: verify-ov*n
repo-raw-url: https://github.com/BinChengZhao/verify-ovpn
其实还有很多优化空间,各位可以随时反馈 :) . 完。
评论区
写评论嗯,谢谢提醒哈, 这里只是设计一个检验配置的工具,咱不随意用v*n,响应国家号召!
--
👇
E834159672: 据说,在中国国内,擅自使用未经许可的VPN联网,是违法行为,而哪些VPN是被许可的,有没有很即时的验证方法,所以使用未知人员搭建的VPN≈违法。
相关规定:
《中华人民共和国计算机信息网络国际联网暂行规定》
第六条和第十四条的规定:
计算机信息网络直接进行国际联网,必须使用邮电部国家公用电信网络提供的国际出入口信道,任何单位和个人都不得使用其他信道进行国际联网。
违反者由公安机关责令停止联网,给予警告,可以并处15000元以下的罚款。
据说,在中国国内,擅自使用未经许可的VPN联网,是违法行为,而哪些VPN是被许可的,有没有很即时的验证方法,所以使用未知人员搭建的VPN≈违法。
相关规定:
《中华人民共和国计算机信息网络国际联网暂行规定》
第六条和第十四条的规定:
计算机信息网络直接进行国际联网,必须使用邮电部国家公用电信网络提供的国际出入口信道,任何单位和个人都不得使用其他信道进行国际联网。
违反者由公安机关责令停止联网,给予警告,可以并处15000元以下的罚款。