我在结构体里面定义了crossbeam_channel::channel的发送端和接收端,然后一个线程写,3个线程读,我想写完后关闭发送端,接收端收到发送端关闭退出,现在程序可以运行,只是不知道如何关闭,close_sender要怎么写?
use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::error::Error;
use std::thread;
use std::time;
pub trait Exec {
fn exec(&self);
}
#[derive(Debug)]
pub struct Job {
id: u64,
data: String,
}
impl Exec for Job {
fn exec(&self) {
println!("job.id: {}, job.data: {}", self.id, self.data);
}
}
pub struct Server {
workers: u32,
sender: Sender<Job>,
receiver: Receiver<Job>,
}
impl Server {
fn new(workers: u32) -> Self {
let (s, r) = bounded::<Job>(0);
Server {
workers,
sender: s,
receiver: r,
}
}
fn close_sender(&self) {
// drop(&self.sender)
}
fn add_job(&self, job: Job) -> Result<(), Box<dyn Error>> {
let s = self.sender.clone();
if let Err(err) = s.send(job) {
return Err(Box::new(err));
};
Ok(())
}
fn run(&self) -> thread::Result<()> {
let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(3);
for ch in 0..self.workers {
let rc = self.receiver.clone();
let th = thread::spawn(move || {
println!("channel[{}] start", ch);
loop {
select! {
recv(rc) -> ret => {
match ret{
Ok(job) =>{
println!("recveie job successed, channel: {}, job: {:?}",ch,job);
job.exec();
},
Err(err)=>{
println!("recveie job failed: {:?}, channel: {}",err,ch);
return;
}
}
thread::sleep(time::Duration::from_secs(1));
},
default(time::Duration::from_secs(5)) => println!("channel: {}, timed out",ch),
}
}
});
threads.push(th);
}
for th in threads {
println!("join start");
th.join()?;
}
println!("server run finished");
Ok(())
}
}
#[cfg(test)]
mod test {
use super::{Job, Server};
use std::time;
use std::{sync::Arc, thread};
#[test]
fn test_worker2() {
let server = Arc::new(Server::new(3));
let s1 = server.clone();
thread::spawn(move || {
for i in 0..10 {
let job = Job {
id: i as u64,
data: format!(r#"{{"id":{},"name":"name-{}"}}"#, i, i),
};
s1.add_job(job).unwrap();
thread::sleep(time::Duration::from_millis(100));
}
s1.close_sender();
println!("send finished");
});
match server.run() {
Ok(_) => print!("server finished"),
Err(err) => println!("err:{:?}", err),
}
}
}
如果发送端不放在结构体里面,在外面创建channel,是可以关闭的
use crossbeam_channel::{select, Receiver};
use std::thread;
use std::time;
pub trait Exec {
fn exec(&self);
}
#[derive(Debug)]
pub struct Job {
id: u64,
data: String,
}
impl Exec for Job {
fn exec(&self) {
println!("job.id: {}, job.data: {}", self.id, self.data);
}
}
pub struct Server {
workers: u32,
receiver: Receiver<Job>,
}
impl Server {
fn new(workers: u32, r: Receiver<Job>) -> Self {
Server {
workers,
receiver: r,
}
}
fn run(&self) -> thread::Result<()> {
let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(3);
for ch in 0..self.workers {
let rc = self.receiver.clone();
let th = thread::spawn(move || {
println!("channel[{}] start", ch);
loop {
select! {
recv(rc) -> ret => {
match ret{
Ok(job) =>{
println!("recveie job successed, channel: {}, job: {:?}",ch,job);
job.exec();
},
Err(err)=>{
println!("recveie job failed: {:?}, channel: {}",err,ch);
return;
}
}
thread::sleep(time::Duration::from_secs(1));
},
default(time::Duration::from_secs(5)) => println!("channel: {}, timed out",ch),
}
}
});
threads.push(th);
}
for th in threads {
println!("join start");
th.join()?;
}
println!("server run finished");
Ok(())
}
}
#[cfg(test)]
mod test {
use crossbeam_channel::bounded;
use std::time;
use std::{panic, thread};
use super::{Job, Server};
#[test]
fn test_worker() {
let (s, r) = bounded::<Job>(0);
thread::spawn(move || {
for i in 0..10 {
let job = Job {
id: i as u64,
data: format!(r#"{{"id":{},"name":"name-{}"}}"#, i, i),
};
s.send(job).unwrap();
thread::sleep(time::Duration::from_millis(100));
}
drop(s);
println!("send finished");
});
match Server::new(3, r).run() {
Ok(_) => print!("server finished"),
Err(err) => panic::resume_unwind(err),
}
}
}
1
共 1 条评论, 1 页
评论区
写评论直接drop掉发送端就行了, 接收端会收到Err, 判断下退出就可以了