krpc-rust
一个最像RPC框架的Rust-RPC框架
krpc-rust是一个高性能,轻量级的rpc框架,通过使用Rust宏来解决目前主流rpc框架使用复杂,性能低等问题,不需要通过脚本和脚手架生成rpc调用代码,通过宏来进行编译期"反射"来实现高性能的调用,来满足rpc调用的简易性,同时支持服务的注册发现和断线重连等。
上篇文章我们对比了目前主流rust-rpc框架所存在的问题,比如说必须通过脚本或者脚手架先生成代码才能使用,并且也提出了使用Rust宏来实现一个更高效优雅的方案,并且实现了一个rpc框架,并进行了压力测试,事实证明了Rust宏的强大之处,本次我们继续讨论一个rpc框架还需要有那些优化。
RPC(Remote Procedure Call)框架的定义是:一种实现RPC协议的软件框架,它提供了一套工具和库,用于简化远程调用的过程。RPC框架的主要目标是隐藏底层网络通信的细节,使开发人员能够像调用本地函数一样调用远程函数,提供了一种透明的方式来实现跨网络的函数调用,使得分布式系统的开发更加简单和高效。
如果只解读这个定义的话,那实际上我们之前已经实现了一个完善的rpc框架,但其实现代定义下的rpc框架不只是简化调用这么简单,而是应该是一个具有扩展性,多组件并且具有服务注册和发现等一系列服务治理能力的框架,实际上就是我们所说的微服务框架,那么今天我们就来实现其中重要的一环,服务注册与发现。
我们直接看如果让我们的rpc框架具有服务注册与发现的能力。
快速开始
https://github.com/kwsc98/krpc-rust
Server
#[derive(Serialize, Deserialize, Default, Debug)]
struct ReqDto {
str: String,
}
#[derive(Serialize, Deserialize, Default)]
struct ResDto {
str: String,
}
#[derive(Clone)]
struct TestServer {
_db: String,
}
//通过宏声明Server
krpc_server! {
TestServer,
//定义版本号
"1.0.0",
//实现rpc接口(错误响应)
async fn do_run1(&self,res : ReqDto) -> Result<ResDto> {
println!("{:?}" ,res);
return Err("错误".to_string());
}
//实现rpc接口(正常响应)
async fn do_run2(&self,res : ReqDto) -> Result<ResDto> {
println!("{:?}" ,res);
return Ok(ResDto { str : "TestServer say hello 1".to_string()});
}
}
#[tokio::main(worker_threads = 512)]
async fn main() {
//实例化Server
let server: TestServer = TestServer {
_db: "我是一个DB数据库".to_string(),
};
//启动rpc服务
KrpcServer::build(
//设置注册中心配置(地址,工作空间,注册中心类型)
RegisterBuilder::new(
&format!("127.0.0.1:{}", "2181"),
"default",
RegisterType::ZooKeeper,
),
//设置服务端口
"8081",
)
//注册服务
.add_rpc_server(Box::new(server))
.run()
.await;
}
Client
//初始化RPC-Client
lazy_static! {
static ref CLI: KrpcClient = KrpcClient::build(
//设置注册中心配置(地址,工作空间,注册中心类型)
RegisterBuilder::new(
&format!("127.0.0.1:{}", "2181"),
"default",
RegisterType::ZooKeeper,
)
);
}
#[derive(Serialize, Deserialize, Default, Debug)]
struct ReqDto {
str: String,
}
#[derive(Serialize, Deserialize, Default,Debug)]
struct ResDto {
str: String,
}
struct TestServer;
//通过宏声明Client
krpc_client! {
CLI,
TestServer,
"1.0.0",
async fn do_run1(&self,res : ReqDto) -> Result<ResDto>
async fn do_run2(&self,res : ReqDto) -> Result<ResDto>
}
#[tokio::main(worker_threads = 512)]
async fn main() {
//实例化rpc接口
let client = TestServer;
//直接进行调用
let res = client.do_run1(ReqDto{str : "client say hello 1".to_string()}).await;
println!("{:?}",res);
let res = client.do_run2(ReqDto{str : "client say hello 2".to_string()}).await;
println!("{:?}",res);
}
我们可以看到,之前我们Server端的初始化只是指定的监听端口,而Client端只是配置了127.0.0.1:8081,由此可知我们之前是Client直连的Server进行了rpc调用。
lazy_static! {
static ref CLI: KrpcClient = KrpcClient::build("http://127.0.0.1:8081".to_string());
}
KrpcServer::build()
.set_port("8081")
.add_rpc_server(Box::new(server))
.run()
.await;
而新版代码里Server端和Client端都配置了127.0.0.1:2181,熟悉dubbo的同学都知道这个是zookeeper的默认端口,我们这版的服务注册与发现也是通过zookeeper来实现的
//初始化RPC-Client
lazy_static! {
static ref CLI: KrpcClient = KrpcClient::build(
//设置注册中心配置(地址,工作空间,注册中心类型)
RegisterBuilder::new(
&format!("127.0.0.1:{}", "2181"),
"default",
RegisterType::ZooKeeper,
)
);
}
//启动rpc服务
KrpcServer::build(
//设置注册中心配置(地址,工作空间,注册中心类型)
RegisterBuilder::new(
&format!("127.0.0.1:{}", "2181"),
"default",
RegisterType::ZooKeeper,
),
//设置服务端口
"8081",
)
//注册服务
.add_rpc_server(Box::new(server))
.run()
.await;
并且本项目还为多注册中心做了支持,之后我计划会接入nacos或者reids作为注册中心。
//注册中心抽象接口
pub trait Register: Send + Sync {
fn add_resource(&self, resource: Resource);
}
//注册中心类型
#[derive(Clone)]
pub enum RegisterType {
ZooKeeper,
Nacos,
...
}
同时为了实现高可用,本项目还支持了异常处理,比如说zookeeper节点下线重连,节点删除自动恢复等功能。
tokio::spawn(async move {
loop {
let client = connect(&cluster, &path).await;
match client
.create(&node_name, node_data.as_bytes(), EPHEMERAL_OPEN)
.await
{
Ok(_) => {}
Err(_err) => {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
}
match client.check_and_watch_stat(&node_name).await {
Ok(watch) => {
let event = watch.1.changed().await;
info!("resource node event {:?}", event);
}
Err(err) => {
info!("resource node err {:?}", err);
}
};
drop(client);
}
});
tokio::spawn(async move {
let mut client = connect(&cluster.clone(), &path).await;
let map = map;
let info = info;
loop {
let watcher: (Vec<String>, zk::Stat, OneshotWatcher) =
match client.get_and_watch_children("/").await {
Ok(watcher) => watcher,
Err(_) => {
client = connect(&cluster.clone(), &path).await;
continue;
}
};
let mut server_list = vec![];
for node in watcher.0 {
let server_info: Vec<&str> = node.split(":").collect();
let info = Info {
server_name: info.server_name.clone(),
version: info.version.clone(),
ip: server_info[0].to_string(),
port: Some(server_info[1].to_string()),
};
server_list.push(SocketInfo { info, sender: Arc::new(RwLock::new(None))});
}
let key = info.server_name.clone() + ":" + &info.version.clone();
info!("update server cache {:?} : {:?}", key, server_list);
let mut temp_map = map.write().await;
temp_map.insert(key, server_list);
drop(temp_map);
let event: zk::WatchedEvent = watcher.2.changed().await;
if let zk::EventType::NodeChildrenChanged = event.event_type {
info!("Monitor node changes");
} else {
client = connect(&cluster.clone(), &path).await;
}
}
});
有兴趣了解的同学可以阅读相关源码。
本项目接下来的计划还包括自定义组件支持等内容,希望大家多多点Star支持~
https://github.com/kwsc98/krpc-rust
评论区
写评论棒棒的