< 返回版块

xbitlabs 发表于 2023-08-07 17:38

use axum::{
    extract::TypedHeader,
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use futures::stream::{self, Stream};
use std::{convert::Infallible, net::SocketAddr, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::StreamExt as _;

#[tokio::main]
async fn main() {
    //创建一个通道
    let (tx,rx) = mpsc::unbounded_channel();
    // build our application with a route
    let app = Router::new()
        .route("/sse", get(sse_handler))
        .route("/", get(|| async { "Hello, World!" }));

    // run it
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn sse_handler(
    TypedHeader(user_agent): TypedHeader<headers::UserAgent>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    println!("`{}` connected", user_agent.as_str());

    let mut i = 0;
    // A `Stream` that repeats an event every second
    let stream = stream::repeat_with(move || {
        //在这里我如何使用上面创建的rx不断的接收消息,然后往前端推送??
        i += 1;
        Event::default().data(format!("hi,{}", &i))
    })
        .map(Ok)
        .throttle(Duration::from_secs(3)); //每3秒,向浏览器发1次消息

    //每隔1秒发1次保活
    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(1))
            .text("keep-alive-text"),
    )
}

依赖

[dependencies]
tokio ={version = "1.29.1",features = ["full", "tracing"]}
tokio-stream = "0.1.14"
tokio-util = {version = "0.7.8",features = ["full"]}
futures = { version = "0.3.28", features = ["thread-pool"]}
log4rs = "1.2.0"
log = "0.4.19"
axum =  {version = "0.6.19", features = ["headers"] }
tower-http = { version = "0.4.1", features = ["fs", "trace"] }
headers = "0.3"
lazy_static = "1.4.0"

评论区

写评论
ManonLoki 2023-08-14 09:09

好的 谢谢

--
👇
fakeshadow: TryStreamStream的扩展trait,从文档我们可以看到所有<S As Stream>::Item = Result<T, E>的类型都实现了TryStream. tokio_stream库的wrappers模组提供了impl Stream的新类型,例如之前提到的UnboundedReceiverStream,所以只要在此基础上让通道传送Result<T, E>也就自动实现了TryStream.

--
👇
ManonLoki: 多谢 这样可以,Tx可以克隆并存储起来,用其他接口去发送数据。我想问一下 如何知道这些Stream之间如何进行转换呢?这个资料很好,我之前尝试过Sse直接使用rx 但是提示我没实现对应的Trait

--
👇
fakeshadow: ``` fn stream() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default())); let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); }

event内容和错误类型请自己根据需要修改


作者 xbitlabs 2023-08-13 22:56

谢谢,完美

--
👇
fakeshadow: ``` fn stream() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default())); let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); }

event内容和错误类型请自己根据需要修改
fakeshadow 2023-08-11 20:07

TryStreamStream的扩展trait,从文档我们可以看到所有<S As Stream>::Item = Result<T, E>的类型都实现了TryStream. tokio_stream库的wrappers模组提供了impl Stream的新类型,例如之前提到的UnboundedReceiverStream,所以只要在此基础上让通道传送Result<T, E>也就自动实现了TryStream.

--
👇
ManonLoki: 多谢 这样可以,Tx可以克隆并存储起来,用其他接口去发送数据。我想问一下 如何知道这些Stream之间如何进行转换呢?这个资料很好,我之前尝试过Sse直接使用rx 但是提示我没实现对应的Trait

--
👇
fakeshadow: ``` fn stream() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default())); let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); }

event内容和错误类型请自己根据需要修改

ManonLoki 2023-08-11 09:36

多谢 这样可以,Tx可以克隆并存储起来,用其他接口去发送数据。我想问一下 如何知道这些Stream之间如何进行转换呢?这个资料很好,我之前尝试过Sse直接使用rx 但是提示我没实现对应的Trait

--
👇
fakeshadow: ``` fn stream() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default())); let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); }

event内容和错误类型请自己根据需要修改
fakeshadow 2023-08-10 21:28
fn stream() {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default()));
    let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
}

event内容和错误类型请自己根据需要修改

作者 xbitlabs 2023-08-10 18:25

在main里创建符合我的实际需求

👇
ThalliMega: mpsc::unbounded_channel是不是应该在sse_handler里面创建而不是main里面? 否则如果有多个访问/sse的连接那receiver不就不single了么

ThalliMega 2023-08-10 09:38

能用stream::unfold(rx, |rx| async {})

ManonLoki 2023-08-09 14:16

沉不了,每天来看看答案,关于楼上的回答,其实现在搞不懂的是SSE的Stream问题,至于channel,可以用别的方式解决

ThalliMega 2023-08-09 12:12

mpsc::unbounded_channel是不是应该在sse_handler里面创建而不是main里面? 否则如果有多个访问/sse的连接那receiver不就不single了么

pama 2023-08-09 10:20

rx 没有实现clone 麻烦

作者 xbitlabs 2023-08-08 22:19

要沉下去了么?

ManonLoki 2023-08-08 09:13

我跟你有同样的疑问,一起等一个回复,这个Repeat最后产出的是一个无限的流,就算你用throttle变成1秒1次,实际上它也只是把闭包重新执行一次。我试过其他的流,如果这个流不是无限的,那么SSE会报错,然后重连……

作者 xbitlabs 2023-08-08 01:03

其实我的问题主要是stream::repeat_with里面我不知道怎么使用rx异步接收消息,然后推往web前端,或者有没有其他的变通方式

--
👇
tx991020: tokio 的channel 是mpsc的,跨协程不好写,这一段可以解决 use tokio::net::{TcpListener, TcpStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::error::Error; use async_channel::{Receiver, Sender}; use state::Container; use tokio::signal::ctrl_c;

pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send + Sync]>::new();

#[tokio::main] async fn main() { let (tx, rx) = async_channel::bounded(10); APPLICATION_CONTEXT.set::<Sender>(tx); APPLICATION_CONTEXT.set::<Receiver>(rx); tokio::spawn(consumer()); tokio::spawn(consumer1()); producer().await; ctrl_c().await; }

async fn producer() { let tx = APPLICATION_CONTEXT.get::<Sender>(); for i in 0..200 { tx.send(i.to_string()).await; } }

async fn consumer() { let rx = APPLICATION_CONTEXT.get::<Receiver>(); let mut cnt = 0; while let Ok(x) = rx.recv().await { cnt += 1; println!("com recv {:?},cnt {}", x, cnt); } }

async fn consumer1() { let rx = APPLICATION_CONTEXT.get::<Receiver>(); let mut cnt = 0; while let Ok(x) = rx.recv().await { cnt += 1; println!("com1 recv {:?},cnt {}", x, cnt); } }

tx991020 2023-08-07 23:03

tokio 的channel 是mpsc的,跨协程不好写,这一段可以解决 use tokio::net::{TcpListener, TcpStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::error::Error; use async_channel::{Receiver, Sender}; use state::Container; use tokio::signal::ctrl_c;

pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send + Sync]>::new();

#[tokio::main] async fn main() { let (tx, rx) = async_channel::bounded(10); APPLICATION_CONTEXT.set::<Sender>(tx); APPLICATION_CONTEXT.set::<Receiver>(rx); tokio::spawn(consumer()); tokio::spawn(consumer1()); producer().await; ctrl_c().await; }

async fn producer() { let tx = APPLICATION_CONTEXT.get::<Sender>(); for i in 0..200 { tx.send(i.to_string()).await; } }

async fn consumer() { let rx = APPLICATION_CONTEXT.get::<Receiver>(); let mut cnt = 0; while let Ok(x) = rx.recv().await { cnt += 1; println!("com recv {:?},cnt {}", x, cnt); } }

async fn consumer1() { let rx = APPLICATION_CONTEXT.get::<Receiver>(); let mut cnt = 0; while let Ok(x) = rx.recv().await { cnt += 1; println!("com1 recv {:?},cnt {}", x, cnt); } }

1 共 14 条评论, 1 页