运行 test 总是收不到消息。
use flutter_rust_bridge::for_generated::anyhow;
use flutter_rust_bridge::for_generated::futures::{SinkExt, StreamExt, TryStreamExt};
use flutter_rust_bridge::DartFnFuture;
use rquest::header::{HeaderMap, HeaderName, HeaderValue};
use rquest::{Message, WebSocket};
use std::collections::HashMap;
use std::fmt::Pointer;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::oneshot::Receiver;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time;
#[derive(Debug)]
struct WebsocketHandler {
uri: String,
web_socket: Arc<Mutex<WebSocket>>,
tx_break: Option<oneshot::Sender<String>>,
}
impl WebsocketHandler {
async fn ping(&mut self) -> anyhow::Result<String> {
match (self.web_socket.lock().await)
.send(Message::Ping(vec![1, 2, 3]))
.await
{
Ok(e) => Ok("".to_string()),
Err(e) => anyhow::Result::Err(anyhow::Error::new(e)),
}
}
async fn run_ws_loop(
&mut self,
dart_response_callback: impl Fn(String) -> DartFnFuture<String>,
) {
}
}
impl WebsocketHandler {
async fn send(&mut self, message: &str) {
// The WebSocket implements `Sink<Message>`.
let mut ws = self.web_socket.lock().await;
match ws.send(Message::Text(message.into())).await {
Ok(_e) => {
dbg!("send success", &self.uri);
}
Err(e) => {
dbg!("send error", e);
}
};
}
async fn close(&mut self, reason: &str) {
let mut ws = self.web_socket.lock().await;
let ws_ref = &mut *ws;
match ws_ref.close().await {
Ok(e) => {
dbg!(reason);
}
Err(_) => {}
};
match Option::take(&mut self.tx_break)
.unwrap()
.send("close".to_string())
{
Ok(e) => {}
Err(_) => {}
}
get_ws_manager().lock().await.unregister(&self.uri);
}
}
pub async fn send(uri: &str, message: &str) -> anyhow::Result<String, anyhow::Error> {
let mut registry = get_ws_manager().lock().await;
let mut handler: Option<&mut WebsocketHandler> = Option::None;
let mut is_some = false;
match (registry.get_handler(uri)) {
None => {}
Some(e) => {
handler = Some(e);
is_some = true;
}
}
if (is_some) {
handler.unwrap().send(message).await;
Ok("Success".to_string())
} else {
Ok("Error".to_string())
}
}
pub async fn close(uri: &str) -> anyhow::Result<String> {
let mut registry = get_ws_manager().lock().await;
let mut handler: Option<&mut WebsocketHandler> = Option::None;
let mut is_some = false;
match (registry.get_handler(uri)) {
None => {}
Some(e) => {
handler = Some(e);
is_some = true;
}
}
if (is_some) {
handler.unwrap().close("").await;
Ok("Success".to_string())
} else {
Ok("Error".to_string())
}
}
pub async fn ping(uri: &str) -> anyhow::Result<String> {
let mut registry = get_ws_manager().lock().await;
let mut handler: Option<&mut WebsocketHandler> = Option::None;
let mut is_some = false;
match (registry.get_handler(uri)) {
None => {}
Some(e) => {
handler = Some(e);
is_some = true;
}
}
if (is_some) {
match (handler.unwrap().ping().await) {
Ok(_) => Ok("Success".to_string()),
Err(e) => Err(e),
}
} else {
Ok("Error".to_string())
}
}
pub async fn connect(
uri: &str,
proxy: &str,
headers: HashMap<String, String>,
on_response: impl Fn(String) -> DartFnFuture<String> + std::marker::Send + 'static,
on_close: impl Fn(String) -> DartFnFuture<String>,
on_connected: impl Fn(String) -> DartFnFuture<String>,
) -> anyhow::Result<String> {
let mut header_values: HashMap<HeaderName, HeaderValue> = HashMap::new();
for (k, v) in headers {
header_values.insert(
HeaderName::from_str(k.as_str())?,
HeaderValue::from_str(v.as_str())?,
);
}
let channel: (Sender<String>, Receiver<String>) = oneshot::channel();
let sender = Some(channel.0);
match (get_ws_manager()
.lock()
.await
.client
.websocket(uri)
.with_builder(|mut builder| {
builder = builder.headers(HeaderMap::from_iter(header_values));
if (!proxy.is_empty()) {
builder = builder.proxy(proxy);
}
builder
})
.send()
.await)
{
Ok(response) => {
// Turns the response into a WebSocket stream.
match (response.into_websocket().await) {
Ok(websocket) => {
let handler = WebsocketHandler {
uri: uri.to_string(),
web_socket: Arc::new(Mutex::new(websocket)),
tx_break: sender,
};
let mut registry = get_ws_manager().lock().await;
registry.register(uri, Box::new(handler));
on_connected("0".to_string());
let uri_spawn = uri.to_string();
let _ = tokio::spawn(async move {
let mut interval = time::interval(Duration::from_millis(100));
let ws_arc = get_ws_manager()
.lock()
.await
.get_handler(&uri_spawn)
.unwrap()
.web_socket
.clone();
loop {
let mut ws = ws_arc.lock().await;
interval.tick().await;
match ws.try_next().await {
Ok(e) => match e {
None => {}
Some(message) => match message {
Message::Text(e) => {
on_response(e);
dbg!("receive Text");
}
Message::Binary(e) => {
dbg!("Binary", e);
}
Message::Ping(e) => {
dbg!("receive Ping");
on_response("Ping".to_string());
match ws.send(Message::Pong(e)).await {
Ok(_) => {}
Err(_) => {}
}
}
Message::Pong(e) => {
dbg!("receive Pong", e.clone());
on_response("Pong".to_string());
}
Message::Close { .. } => {
dbg!("Close");
match close(&uri_spawn).await {
Ok(_) => {}
Err(_) => {}
}
}
},
},
Err(e) => {
eprintln!("Error sending ping: {}", e);
}
}
}
})
.await;
tokio::select! {
val = channel.1 => {
dbg!("Task end");
}
}
get_ws_manager().lock().await.unregister(&uri);
on_close("0".to_string());
Ok("End".to_string())
}
Err(e) => anyhow::Result::Err(anyhow::Error::new(e)),
}
}
Err(e) => anyhow::Result::Err(anyhow::Error::new(e)),
}
}
#[derive(Debug)]
struct WsManager {
handlers: HashMap<String, Box<WebsocketHandler>>,
client: Box<rquest::Client>,
}
impl WsManager {
fn new() -> Self {
Self {
handlers: HashMap::new(),
client: Box::new(rquest::Client::new()),
}
}
fn register(&mut self, key: &str, handler: Box<WebsocketHandler>) {
let cache = self.get_handler(key);
if (cache.is_some()) {
dbg!("cache hit", cache);
}
self.handlers.insert(key.to_string(), handler);
}
fn unregister(&mut self, key: &str) {
let cache = self.get_handler(key);
if (cache.is_some()) {
dbg!("cache hit");
self.handlers.remove(key);
}
}
fn get_handler(&mut self, key: &str) -> Option<&mut Box<WebsocketHandler>> {
self.handlers.get_mut(key)
}
async fn send(&mut self, key: &str, message: &str) {
match self.get_handler(key) {
None => {}
Some(e) => {
e.send(message).await;
}
}
}
}
static SS_MANAGER_LOCK: OnceLock<Mutex<WsManager>> = OnceLock::new();
fn get_ws_manager() -> &'static Mutex<WsManager> {
SS_MANAGER_LOCK.get_or_init(|| {
let registry = Mutex::new(WsManager::new());
registry
})
}
#[tokio::test]
async fn test_ws() {
let mut map: HashMap<String, String> = HashMap::new();
//x-channel: IM
// x-echoing-env:
// sec-websocket-version: 13
map.insert("x-channel".to_string(), "IM".to_string());
map.insert(
"x-device".to_string(),
"2F818C08-54E4-4153-A432-C6DDF553585A2231".to_string(),
);
map.insert("x-request-package-id".to_string(), "1042".to_string());
map.insert("accept-encoding".to_string(), "gzip".to_string());
map.insert(
"user-agent".to_string(),
"xxxx".to_string(),
);
map.insert("authorization".to_string(), "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6IjQ0ODEzMzQwODc0MTEzODYxNyIsInR5cGUiOiJVU0VSIiwiZGV2aWNlSWQiOiIxMDQyIiwiZXhwIjoxNzM3NjI4NTA1LCJpYXQiOjE3Mzc1NDIxMDV9.9yvRLC-ZdvOC8_MouV4pFt32H2B0SskLPqb9WmpkeUo".to_string());
let uri = "wss://dev-ws.xx.cn/pgw/ws";
let connected = Arc::new(Mutex::new(false));
let s = connect(
uri,
"",
map,
|s| {
println!("on_response {:?}", s);
return create_dart_future_string("".to_string());
},
|s| {
println!("on_close {:?}", s);
return create_dart_future_string("".to_string());
},
|s| {
println!("on_connected");
let json_str = r#"{"srv": {"srv": "messager","pkg": "Messager","md": "ListChatV2"},"meta": {"ack": "UNACK","id": "1efd885a-5d0c-6fb0-892c-158479c0d38a1"},"datas": "{\"page\":{\"limit\":16,\"after\":null,\"before\":null}}"}"#;
send(uri, json_str);
return create_dart_future_string("".to_string());
},
).await;
}
1
共 5 条评论, 1 页
评论区
写评论要在 test 里面看到 log 输出,常见的解决办法是用 env_logger crate
--
👇
lwj1994: 好的 我试试。还有个问题就是:为啥在 tokio:test 运行 test 下打印不出日志。
--
👇
jerryshell: 看样子你好像直接用 Mutex 锁住了 WebSocket
Arc Mutex 更适合跨线程分享数据的场景
像 WebSocket 这样的 IO 资源应该考虑使用 Actor 模型
具体可以参考这篇 Tokio 核心开发者写的经典博客:Actors with Tokio
这样试试 cargo test -- --show-output
正常测试就是不会打印println的输出
好的 我试试。还有个问题就是:为啥在 tokio:test 运行 test 下打印不出日志。
--
👇
jerryshell: 看样子你好像直接用 Mutex 锁住了 WebSocket
Arc Mutex 更适合跨线程分享数据的场景
像 WebSocket 这样的 IO 资源应该考虑使用 Actor 模型
具体可以参考这篇 Tokio 核心开发者写的经典博客:Actors with Tokio
看样子你好像直接用 Mutex 锁住了 WebSocket
Arc Mutex 更适合跨线程分享数据的场景
像 WebSocket 这样的 IO 资源应该考虑使用 Actor 模型
具体可以参考这篇 Tokio 核心开发者写的经典博客:Actors with Tokio