< 返回版块

lygz5016 发表于 2020-01-01 00:56

Tags:actix,tokio,SplitSink,UdpFramed

use actix::prelude::*;
use actix::StreamHandler;
use bytes::{Buf, Bytes, BytesMut};
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt};
use std::net::SocketAddr;
use std::net::UdpSocket as StdUdpSocket;
use tokio_util::{codec::BytesCodec, udp::UdpFramed};

struct UdpDataMsg(pub Result<(BytesMut, SocketAddr), String>);

#[derive(Default)]
struct UdpServer {
    sink: Option<SplitSink<UdpFramed<BytesCodec>, (Bytes, SocketAddr)>>,
}

impl Actor for UdpServer {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        let std_socket = StdUdpSocket::bind("0.0.0.0:60000").unwrap();
        let sock = tokio::net::UdpSocket::from_std(std_socket).unwrap();
        let (sink, stream) = UdpFramed::new(sock, BytesCodec::new()).split();
        Self::add_stream(
            stream.map(|a| UdpDataMsg(a.map_err(|e| e.to_string()))),
            ctx,
        );
        self.sink = Some(sink);
    }
}

impl StreamHandler<UdpDataMsg> for UdpServer {
    fn handle(&mut self, data: UdpDataMsg, _ctx: &mut Self::Context) {
        if let Ok((mut bytes, peer)) = data.0 {
            if let Some(ref mut sink) = self.sink {
                let fut = sink.send((bytes.to_bytes(), peer));
                futures::executor::block_on(async {
                    let _ = fut.await;
                });
            }
        }
    }
}

上面代码可以执行. 如果没有StreamHandler 里面的block_on是不会发包的. 大佬们瞧瞧, SplitSink在actix要如何使用?

评论区

写评论

还没有评论

1 共 0 条评论, 1 页