use axum::{
extract::TypedHeader,
response::sse::{Event, Sse},
routing::get,
Router,
};
use futures::stream::{self, Stream};
use std::{convert::Infallible, net::SocketAddr, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::StreamExt as _;
#[tokio::main]
async fn main() {
//创建一个通道
let (tx,rx) = mpsc::unbounded_channel();
// build our application with a route
let app = Router::new()
.route("/sse", get(sse_handler))
.route("/", get(|| async { "Hello, World!" }));
// run it
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn sse_handler(
TypedHeader(user_agent): TypedHeader<headers::UserAgent>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
println!("`{}` connected", user_agent.as_str());
let mut i = 0;
// A `Stream` that repeats an event every second
let stream = stream::repeat_with(move || {
//在这里我如何使用上面创建的rx不断的接收消息,然后往前端推送??
i += 1;
Event::default().data(format!("hi,{}", &i))
})
.map(Ok)
.throttle(Duration::from_secs(3)); //每3秒,向浏览器发1次消息
//每隔1秒发1次保活
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(1))
.text("keep-alive-text"),
)
}
依赖
[dependencies]
tokio ={version = "1.29.1",features = ["full", "tracing"]}
tokio-stream = "0.1.14"
tokio-util = {version = "0.7.8",features = ["full"]}
futures = { version = "0.3.28", features = ["thread-pool"]}
log4rs = "1.2.0"
log = "0.4.19"
axum = {version = "0.6.19", features = ["headers"] }
tower-http = { version = "0.4.1", features = ["fs", "trace"] }
headers = "0.3"
lazy_static = "1.4.0"
1
共 14 条评论, 1 页
评论区
写评论好的 谢谢
--
👇
fakeshadow: TryStream是Stream的扩展trait,从文档我们可以看到所有
<S As Stream>::Item = Result<T, E>
的类型都实现了TryStream
.tokio_stream
库的wrappers
模组提供了implStream
的新类型,例如之前提到的UnboundedReceiverStream,所以只要在此基础上让通道传送Result<T, E>
也就自动实现了TryStream
.--
👇
ManonLoki: 多谢 这样可以,Tx可以克隆并存储起来,用其他接口去发送数据。我想问一下 如何知道这些Stream之间如何进行转换呢?这个资料很好,我之前尝试过Sse直接使用rx 但是提示我没实现对应的Trait
--
👇
fakeshadow: ``` fn stream() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default())); let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); }
谢谢,完美
--
👇
fakeshadow: ``` fn stream() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default())); let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); }
TryStream是Stream的扩展trait,从文档我们可以看到所有
<S As Stream>::Item = Result<T, E>
的类型都实现了TryStream
.tokio_stream
库的wrappers
模组提供了implStream
的新类型,例如之前提到的UnboundedReceiverStream,所以只要在此基础上让通道传送Result<T, E>
也就自动实现了TryStream
.--
👇
ManonLoki: 多谢 这样可以,Tx可以克隆并存储起来,用其他接口去发送数据。我想问一下 如何知道这些Stream之间如何进行转换呢?这个资料很好,我之前尝试过Sse直接使用rx 但是提示我没实现对应的Trait
--
👇
fakeshadow: ``` fn stream() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default())); let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); }
多谢 这样可以,Tx可以克隆并存储起来,用其他接口去发送数据。我想问一下 如何知道这些Stream之间如何进行转换呢?这个资料很好,我之前尝试过Sse直接使用rx 但是提示我没实现对应的Trait
--
👇
fakeshadow: ``` fn stream() { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let _ = tx.send(Ok::<_, std::convert::Infallible>(axum::response::sse::Event::default())); let _ = axum::response::Sse::new(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); }
event内容和错误类型请自己根据需要修改
在main里创建符合我的实际需求
👇
ThalliMega:
mpsc::unbounded_channel
是不是应该在sse_handler里面创建而不是main里面? 否则如果有多个访问/sse的连接那receiver不就不single了么能用
stream::unfold(rx, |rx| async {})
么沉不了,每天来看看答案,关于楼上的回答,其实现在搞不懂的是SSE的Stream问题,至于channel,可以用别的方式解决
mpsc::unbounded_channel
是不是应该在sse_handler里面创建而不是main里面? 否则如果有多个访问/sse的连接那receiver不就不single了么rx 没有实现clone 麻烦
要沉下去了么?
我跟你有同样的疑问,一起等一个回复,这个Repeat最后产出的是一个无限的流,就算你用throttle变成1秒1次,实际上它也只是把闭包重新执行一次。我试过其他的流,如果这个流不是无限的,那么SSE会报错,然后重连……
其实我的问题主要是stream::repeat_with里面我不知道怎么使用rx异步接收消息,然后推往web前端,或者有没有其他的变通方式
--
👇
tx991020: tokio 的channel 是mpsc的,跨协程不好写,这一段可以解决 use tokio::net::{TcpListener, TcpStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::error::Error; use async_channel::{Receiver, Sender}; use state::Container; use tokio::signal::ctrl_c;
pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send + Sync]>::new();
#[tokio::main] async fn main() { let (tx, rx) = async_channel::bounded(10); APPLICATION_CONTEXT.set::<Sender>(tx); APPLICATION_CONTEXT.set::<Receiver>(rx); tokio::spawn(consumer()); tokio::spawn(consumer1()); producer().await; ctrl_c().await; }
async fn producer() { let tx = APPLICATION_CONTEXT.get::<Sender>(); for i in 0..200 { tx.send(i.to_string()).await; } }
async fn consumer() { let rx = APPLICATION_CONTEXT.get::<Receiver>(); let mut cnt = 0; while let Ok(x) = rx.recv().await { cnt += 1; println!("com recv {:?},cnt {}", x, cnt); } }
async fn consumer1() { let rx = APPLICATION_CONTEXT.get::<Receiver>(); let mut cnt = 0; while let Ok(x) = rx.recv().await { cnt += 1; println!("com1 recv {:?},cnt {}", x, cnt); } }
tokio 的channel 是mpsc的,跨协程不好写,这一段可以解决 use tokio::net::{TcpListener, TcpStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::error::Error; use async_channel::{Receiver, Sender}; use state::Container; use tokio::signal::ctrl_c;
pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send + Sync]>::new();
#[tokio::main] async fn main() { let (tx, rx) = async_channel::bounded(10); APPLICATION_CONTEXT.set::<Sender>(tx); APPLICATION_CONTEXT.set::<Receiver>(rx); tokio::spawn(consumer()); tokio::spawn(consumer1()); producer().await; ctrl_c().await; }
async fn producer() { let tx = APPLICATION_CONTEXT.get::<Sender>(); for i in 0..200 { tx.send(i.to_string()).await; } }
async fn consumer() { let rx = APPLICATION_CONTEXT.get::<Receiver>(); let mut cnt = 0; while let Ok(x) = rx.recv().await { cnt += 1; println!("com recv {:?},cnt {}", x, cnt); } }
async fn consumer1() { let rx = APPLICATION_CONTEXT.get::<Receiver>(); let mut cnt = 0; while let Ok(x) = rx.recv().await { cnt += 1; println!("com1 recv {:?},cnt {}", x, cnt); } }