缘起: delay-timer v0.3.0 release,想去reddit上发布一下,但是遇到了梯子不可用的情况。 咨询客服无果(人倒是很好),因为平时用openvn上梯子,遂把他们的 ovn 的一万台服务器配置下载下来,自己筛选可连用配置。
顺手用tokio写了一个工具, 筛选可用配置。
里面有一些,自定义Future状态机设计的思路,跟大家分享一下。
首先拆分需求: 高速的在成千上万的配置中找到可用的那部分, 也就是说能够跟配置上声明的多个服务器建立通信。
因为多是IO请求,借async之力发送并发请求,采用:tokio
需要一个命令行入口,供用户输入并让输出自定义的信息,为了满足这部分采用:structopt
在运行过程中,为了让用户有更好的体验,透明化当前的进度,采用:progress_bar
coding:
Udp-check部分建立通信,尝试与服务端读写, 根据实际情况返回Result。
pub(crate) async fn verify_addres_by_udp(address: impl ToSocketAddrs + Debug) -> Result<usize> {
// change to 0.0.0.0:0 is good.
// bind() the socket before you send your data. Specify port 0 to bind(),
// and the OS will pick an unused port for you.
let sock = UdpSocket::bind("0.0.0.0:0".parse::<SocketAddr>().unwrap()).await?;
sock.set_ttl(15)?;
sock.connect(address).await.unwrap_or(());
sock.send(b"hello world").await?;
let mut buf = vec![0; 10];
timeout(Duration::from_secs(15), sock.recv(&mut buf)).await?
}
Tcp部分建立通信,尝试对服务端写, 根据实际情况返回Result。
pub(crate) async fn verify_addres_by_tcp(address: String) -> Result<()> {
let sock = TcpSocket::new_v4()?;
let stream = sock.connect(address.parse().unwrap()).await?;
timeout(Duration::from_secs(15), stream.writable()).await?
}
切分ovpn配置文件,获取 IP 端口 协议等元数据
pub(crate) async fn verify_one_conf(
file_name: OsString,
_concurrency_limit: ConcurrencyLimits,
) -> Option<OsString> {
let f = File::open(file_name.clone()).await.unwrap();
let buf_reader = BufReader::new(f);
let mut lines = buf_reader.lines();
let mut state: i32 = 0;
let mut through_file_name: Option<OsString> = None;
let mut proto = Proto::Tcp;
let mut address = String::new();
while let Some(line) = lines.next_line().await.unwrap_or_else(|e| {
dbg!(e);
None
}) {
if line.contains("proto") {
state += 1;
if line.trim_start_matches("proto").trim() == "udp" {
proto = Proto::Udp;
}
}
if line.contains("remote") {
state += 1;
let remote = line.trim_start_matches("remote").trim();
//split
address = remote.splitn(2, " ").collect::<Vec<&str>>().join(":");
}
if state == 2 {
match proto {
Proto::Udp => {
if verify_addres_by_udp(address).await.is_ok() {
through_file_name = Some(file_name);
}
}
Proto::Tcp => {
if verify_addres_by_tcp(address).await.is_ok() {
through_file_name = Some(file_name);
}
}
}
return through_file_name;
}
}
through_file_name
}
回到main.rs,定义命令行输入输出的定义
/// Structual of verify-ovpn.
#[derive(StructOpt, Debug)]
#[structopt(name = "verify-ovpn")]
struct Opt {
/// Input path (Absolute path)
#[structopt(short, long, parse(from_os_str))]
input: PathBuf,
/// Output path (Absolute path)
#[structopt(short, long, parse(from_os_str))]
output: PathBuf,
}
main的实际跑的是一个 Future, 它控制全局的任务生产消费。
fn main() -> Result<()> {
let opt: Opt = Opt::from_args();
// Changing the program's working directory eliminates the need to specify an absolute path in front of each file.
set_current_dir(opt.input.clone())?;
let tr = Runtime::new()?;
// when async block can't infer returen-type change it to async-fn so you can explict return-type.
tr.block_on(verify_ovpns(opt.input, opt.output))
}
预备基本的数据,准备执行实际的任务分配。
async fn verify_ovpns(input: PathBuf, output: PathBuf) -> Result<()> {
// prepare data-source.
let mut entry_buf = Vec::<DirEntry>::with_capacity(2048);
let output_file = File::create(output).await?;
let mut entries = read_dir(input).await?;
while let Some(entry) = entries.next_entry().await? {
entry_buf.push(entry);
}
// set Process bar.
let mut progress_bar = ProgressBar::new(entry_buf.len() * 2);
progress_bar.set_action("Loading", Color::Blue, Style::Bold);
progress_bar.print_info("Hint", "The default is to check 1000 ovpn at a time concurrently.", Color::Green, Style::Bold);
do_verify(entry_buf, output_file, progress_bar).await
}
实际的任务分配,与progress_bar的实时更新。
因为操作系统限制了打开句柄的打开数量(默认是1000), 我这里自己封装了 一个异步的Future-like数据结构,控制任务的启动数量。
名为:ConcurrencyLimits
(下面会写如何设计)。
async fn do_verify(
entry_buf: Vec<DirEntry>,
mut output_file: File,
mut progress_bar: ProgressBar,
) -> Result<()> {
let process_num;
let mut success_num = 0;
let concurrency_limit = ConcurrencyLimits::default();
let mut join_handle_vec = Vec::new();
for file_entry in entry_buf.into_iter() {
progress_bar.inc();
concurrency_limit.clone().await;
let file_name = file_entry.file_name();
join_handle_vec.push(spawn(verify_one_conf(file_name, concurrency_limit.clone())));
}
process_num = join_handle_vec.len();
for j in join_handle_vec.into_iter() {
progress_bar.inc();
if let Ok(o_s) = j.await {
if let Some(s) = o_s {
if let Some(mut file_name) = s.into_string().ok() {
file_name += LINE_ENDING;
success_num += 1;
output_file.write(dbg!(file_name).as_bytes()).await?;
}
}
}
}
output_file.sync_all().await?;
let done_text = format!(
"Total {}, successfully connected {}",
process_num, success_num
);
progress_bar.print_final_info("Done", &done_text, Color::LightGreen, Style::Bold);
Ok(())
}
ConcurrencyLimits
设计思路:
对于并发任务数量的并发,我们可以从入口出发做控制。
需要我们封装状态机:
通过Future 异步的抽象,构造一个异步工作的数据结构。
关联的状态有:
当前的并发数 暂存的上下文,唤醒上游继续生产新任务
ps:cx(Context)
第一步想到的是用,(arc: (计数状态:Atomic) + (cx原语: buf)) 结构处理 因为,需要同时保证多个线程之间的计数同步,与状态共享。
后反思到, arc自身就带有引用计数, 多来一个atomic似乎显得多余,于是去除掉。 Arc<Buf>.
共享后cx后, 需要任务完成时自动唤醒上游,继续spawn任务,为了代码的简洁精炼把这部分放到了 Drop
中 。
这样我们只需要在生产任务时,追加一个 concurrency_limit
结构就会自动控制任务数量。
main-future 上 执行concurrency_limit.await
就可以根据当前的任务量自动控制 暂停/继续了。
同时可以确定Buf的类型,采用Mutex, 因为可能发生多个线程同时操作cx的情况。
#[derive(Debug, Clone, Default)]
pub(crate) struct ConcurrencyLimits {
inner: Arc<Mutex<Option<Waker>>>,
}
impl Future for ConcurrencyLimits {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let arc_mutext = &self.as_ref().get_ref().inner;
let mut p = Poll::Ready(());
if Arc::strong_count(arc_mutext) > 999 {
p = Poll::Pending;
let mut guard = arc_mutext.lock().unwrap();
*guard = Some(cx.waker().clone());
}
p
}
}
impl Drop for ConcurrencyLimits {
fn drop(&mut self) {
let arc_mutext = &self.inner;
let mut guard = arc_mutext.lock().unwrap();
if let Some(wk) = guard.take() {
wk.wake();
}
}
}
repo: verify-ov*n
repo-raw-url: https://github.com/BinChengZhao/verify-ovpn
其实还有很多优化空间,各位可以随时反馈 :) . 完。
评论区
写评论嗯,谢谢提醒哈, 这里只是设计一个检验配置的工具,咱不随意用v*n,响应国家号召!
--
👇
E834159672: 据说,在中国国内,擅自使用未经许可的VPN联网,是违法行为,而哪些VPN是被许可的,有没有很即时的验证方法,所以使用未知人员搭建的VPN≈违法。
相关规定:
《中华人民共和国计算机信息网络国际联网暂行规定》
第六条和第十四条的规定:
计算机信息网络直接进行国际联网,必须使用邮电部国家公用电信网络提供的国际出入口信道,任何单位和个人都不得使用其他信道进行国际联网。
违反者由公安机关责令停止联网,给予警告,可以并处15000元以下的罚款。
据说,在中国国内,擅自使用未经许可的VPN联网,是违法行为,而哪些VPN是被许可的,有没有很即时的验证方法,所以使用未知人员搭建的VPN≈违法。
相关规定:
《中华人民共和国计算机信息网络国际联网暂行规定》
第六条和第十四条的规定:
计算机信息网络直接进行国际联网,必须使用邮电部国家公用电信网络提供的国际出入口信道,任何单位和个人都不得使用其他信道进行国际联网。
违反者由公安机关责令停止联网,给予警告,可以并处15000元以下的罚款。