< 返回版块

lwj1994 发表于 2025-01-22 22:57

运行 test 总是收不到消息。

use flutter_rust_bridge::for_generated::anyhow;
use flutter_rust_bridge::for_generated::futures::{SinkExt, StreamExt, TryStreamExt};
use flutter_rust_bridge::DartFnFuture;
use rquest::header::{HeaderMap, HeaderName, HeaderValue};
use rquest::{Message, WebSocket};
use std::collections::HashMap;
use std::fmt::Pointer;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::oneshot::Receiver;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time;

#[derive(Debug)]
struct WebsocketHandler {
    uri: String,
    web_socket: Arc<Mutex<WebSocket>>,
    tx_break: Option<oneshot::Sender<String>>,
}

impl WebsocketHandler {
    async fn ping(&mut self) -> anyhow::Result<String> {
        match (self.web_socket.lock().await)
            .send(Message::Ping(vec![1, 2, 3]))
            .await
        {
            Ok(e) => Ok("".to_string()),
            Err(e) => anyhow::Result::Err(anyhow::Error::new(e)),
        }
    }
    async fn run_ws_loop(
        &mut self,
        dart_response_callback: impl Fn(String) -> DartFnFuture<String>,
    ) {
    }
}

impl WebsocketHandler {
    async fn send(&mut self, message: &str) {
        // The WebSocket implements `Sink<Message>`.
        let mut ws = self.web_socket.lock().await;

        match ws.send(Message::Text(message.into())).await {
            Ok(_e) => {
                dbg!("send success", &self.uri);
            }
            Err(e) => {
                dbg!("send error", e);
            }
        };
    }

    async fn close(&mut self, reason: &str) {
        let mut ws = self.web_socket.lock().await;
        let ws_ref = &mut *ws;
        match ws_ref.close().await {
            Ok(e) => {
                dbg!(reason);
            }
            Err(_) => {}
        };

        match Option::take(&mut self.tx_break)
            .unwrap()
            .send("close".to_string())
        {
            Ok(e) => {}
            Err(_) => {}
        }

        get_ws_manager().lock().await.unregister(&self.uri);
    }
}

pub async fn send(uri: &str, message: &str) -> anyhow::Result<String, anyhow::Error> {
    let mut registry = get_ws_manager().lock().await;
    let mut handler: Option<&mut WebsocketHandler> = Option::None;
    let mut is_some = false;
    match (registry.get_handler(uri)) {
        None => {}
        Some(e) => {
            handler = Some(e);
            is_some = true;
        }
    }

    if (is_some) {
        handler.unwrap().send(message).await;
        Ok("Success".to_string())
    } else {
        Ok("Error".to_string())
    }
}

pub async fn close(uri: &str) -> anyhow::Result<String> {
    let mut registry = get_ws_manager().lock().await;
    let mut handler: Option<&mut WebsocketHandler> = Option::None;
    let mut is_some = false;
    match (registry.get_handler(uri)) {
        None => {}
        Some(e) => {
            handler = Some(e);
            is_some = true;
        }
    }

    if (is_some) {
        handler.unwrap().close("").await;
        Ok("Success".to_string())
    } else {
        Ok("Error".to_string())
    }
}

pub async fn ping(uri: &str) -> anyhow::Result<String> {
    let mut registry = get_ws_manager().lock().await;
    let mut handler: Option<&mut WebsocketHandler> = Option::None;
    let mut is_some = false;
    match (registry.get_handler(uri)) {
        None => {}
        Some(e) => {
            handler = Some(e);
            is_some = true;
        }
    }

    if (is_some) {
        match (handler.unwrap().ping().await) {
            Ok(_) => Ok("Success".to_string()),
            Err(e) => Err(e),
        }
    } else {
        Ok("Error".to_string())
    }
}

pub async fn connect(
    uri: &str,
    proxy: &str,
    headers: HashMap<String, String>,
    on_response: impl Fn(String) -> DartFnFuture<String> + std::marker::Send + 'static,
    on_close: impl Fn(String) -> DartFnFuture<String>,
    on_connected: impl Fn(String) -> DartFnFuture<String>,
) -> anyhow::Result<String> {
    let mut header_values: HashMap<HeaderName, HeaderValue> = HashMap::new();
    for (k, v) in headers {
        header_values.insert(
            HeaderName::from_str(k.as_str())?,
            HeaderValue::from_str(v.as_str())?,
        );
    }

    let channel: (Sender<String>, Receiver<String>) = oneshot::channel();
    let sender = Some(channel.0);
    match (get_ws_manager()
        .lock()
        .await
        .client
        .websocket(uri)
        .with_builder(|mut builder| {
            builder = builder.headers(HeaderMap::from_iter(header_values));
            if (!proxy.is_empty()) {
                builder = builder.proxy(proxy);
            }
            builder
        })
        .send()
        .await)
    {
        Ok(response) => {
            // Turns the response into a WebSocket stream.
            match (response.into_websocket().await) {
                Ok(websocket) => {
                    let handler = WebsocketHandler {
                        uri: uri.to_string(),
                        web_socket: Arc::new(Mutex::new(websocket)),
                        tx_break: sender,
                    };
                    let mut registry = get_ws_manager().lock().await;
                    registry.register(uri, Box::new(handler));
                    on_connected("0".to_string());
                    let uri_spawn = uri.to_string();
                    let _ = tokio::spawn(async move {
                        let mut interval = time::interval(Duration::from_millis(100));
                        let ws_arc = get_ws_manager()
                            .lock()
                            .await
                            .get_handler(&uri_spawn)
                            .unwrap()
                            .web_socket
                            .clone();
                        loop {
                            let mut ws = ws_arc.lock().await;
                            interval.tick().await;
                            match ws.try_next().await {
                                Ok(e) => match e {
                                    None => {}
                                    Some(message) => match message {
                                        Message::Text(e) => {
                                            on_response(e);
                                            dbg!("receive Text");
                                        }
                                        Message::Binary(e) => {
                                            dbg!("Binary", e);
                                        }
                                        Message::Ping(e) => {
                                            dbg!("receive Ping");
                                            on_response("Ping".to_string());
                                            match ws.send(Message::Pong(e)).await {
                                                Ok(_) => {}
                                                Err(_) => {}
                                            }
                                        }
                                        Message::Pong(e) => {
                                            dbg!("receive Pong", e.clone());
                                            on_response("Pong".to_string());
                                        }
                                        Message::Close { .. } => {
                                            dbg!("Close");
                                            match close(&uri_spawn).await {
                                                Ok(_) => {}
                                                Err(_) => {}
                                            }
                                        }
                                    },
                                },
                                Err(e) => {
                                    eprintln!("Error sending ping: {}", e);
                                }
                            }
                        }
                    })
                    .await;

                    tokio::select! {
                        val = channel.1 => {
                            dbg!("Task end");
                        }
                    }

                    get_ws_manager().lock().await.unregister(&uri);
                    on_close("0".to_string());
                    Ok("End".to_string())
                }
                Err(e) => anyhow::Result::Err(anyhow::Error::new(e)),
            }
        }
        Err(e) => anyhow::Result::Err(anyhow::Error::new(e)),
    }
}

#[derive(Debug)]
struct WsManager {
    handlers: HashMap<String, Box<WebsocketHandler>>,
    client: Box<rquest::Client>,
}

impl WsManager {
    fn new() -> Self {
        Self {
            handlers: HashMap::new(),
            client: Box::new(rquest::Client::new()),
        }
    }

    fn register(&mut self, key: &str, handler: Box<WebsocketHandler>) {
        let cache = self.get_handler(key);
        if (cache.is_some()) {
            dbg!("cache hit", cache);
        }
        self.handlers.insert(key.to_string(), handler);
    }

    fn unregister(&mut self, key: &str) {
        let cache = self.get_handler(key);
        if (cache.is_some()) {
            dbg!("cache hit");
            self.handlers.remove(key);
        }
    }

    fn get_handler(&mut self, key: &str) -> Option<&mut Box<WebsocketHandler>> {
        self.handlers.get_mut(key)
    }

    async fn send(&mut self, key: &str, message: &str) {
        match self.get_handler(key) {
            None => {}
            Some(e) => {
                e.send(message).await;
            }
        }
    }
}

static SS_MANAGER_LOCK: OnceLock<Mutex<WsManager>> = OnceLock::new();

fn get_ws_manager() -> &'static Mutex<WsManager> {
    SS_MANAGER_LOCK.get_or_init(|| {
        let registry = Mutex::new(WsManager::new());
        registry
    })
}




    #[tokio::test]
    async fn test_ws() {
        let mut map: HashMap<String, String> = HashMap::new();
        //x-channel: IM
        // x-echoing-env:
        // sec-websocket-version: 13
        map.insert("x-channel".to_string(), "IM".to_string());
        map.insert(
            "x-device".to_string(),
            "2F818C08-54E4-4153-A432-C6DDF553585A2231".to_string(),
        );
        map.insert("x-request-package-id".to_string(), "1042".to_string());
        map.insert("accept-encoding".to_string(), "gzip".to_string());
        map.insert(
            "user-agent".to_string(),
            "xxxx".to_string(),
        );
        map.insert("authorization".to_string(), "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6IjQ0ODEzMzQwODc0MTEzODYxNyIsInR5cGUiOiJVU0VSIiwiZGV2aWNlSWQiOiIxMDQyIiwiZXhwIjoxNzM3NjI4NTA1LCJpYXQiOjE3Mzc1NDIxMDV9.9yvRLC-ZdvOC8_MouV4pFt32H2B0SskLPqb9WmpkeUo".to_string());

        let uri = "wss://dev-ws.xx.cn/pgw/ws";
        let connected = Arc::new(Mutex::new(false));
        let s =  connect(
            uri,
            "",
            map,
            |s| {
                println!("on_response {:?}", s);
                return create_dart_future_string("".to_string());
            },
            |s| {
                println!("on_close {:?}", s);
                return create_dart_future_string("".to_string());
            },
            |s| {
                println!("on_connected");
                let json_str = r#"{"srv": {"srv": "messager","pkg": "Messager","md": "ListChatV2"},"meta": {"ack": "UNACK","id": "1efd885a-5d0c-6fb0-892c-158479c0d38a1"},"datas": "{\"page\":{\"limit\":16,\"after\":null,\"before\":null}}"}"#;
                send(uri, json_str);
                return create_dart_future_string("".to_string());
            },
        ).await;
    }

评论区

写评论
ljsnogard 2025-01-31 00:16

要在 test 里面看到 log 输出,常见的解决办法是用 env_logger crate

--
👇
lwj1994: 好的 我试试。还有个问题就是:为啥在 tokio:test 运行 test 下打印不出日志。

--
👇
jerryshell: 看样子你好像直接用 Mutex 锁住了 WebSocket

Arc Mutex 更适合跨线程分享数据的场景

像 WebSocket 这样的 IO 资源应该考虑使用 Actor 模型

具体可以参考这篇 Tokio 核心开发者写的经典博客:Actors with Tokio

我心飞翔 2025-01-23 10:19

这样试试 cargo test -- --show-output

bestgopher 2025-01-23 09:50

正常测试就是不会打印println的输出

作者 lwj1994 2025-01-23 09:21

好的 我试试。还有个问题就是:为啥在 tokio:test 运行 test 下打印不出日志。

--
👇
jerryshell: 看样子你好像直接用 Mutex 锁住了 WebSocket

Arc Mutex 更适合跨线程分享数据的场景

像 WebSocket 这样的 IO 资源应该考虑使用 Actor 模型

具体可以参考这篇 Tokio 核心开发者写的经典博客:Actors with Tokio

jerryshell 2025-01-23 08:08

看样子你好像直接用 Mutex 锁住了 WebSocket

Arc Mutex 更适合跨线程分享数据的场景

像 WebSocket 这样的 IO 资源应该考虑使用 Actor 模型

具体可以参考这篇 Tokio 核心开发者写的经典博客:Actors with Tokio

1 共 5 条评论, 1 页