< 返回版块

Aaron009 发表于 2021-05-02 15:45

教程链接:

https://doc.rust-lang.org/book/ch20-02-multithreaded.html#implementing-the-execute-method

文章中有这样一段代码和文字:

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {} got a job; executing.", id);

            job();
        });

        Worker { id, thread }
    }
}

Here, we first call lock on the receiver to acquire the mutex, and then we call unwrap to panic on any errors. Acquiring a lock might fail if the mutex is in a poisoned state, which can happen if some other thread panicked while holding the lock rather than releasing the lock. In this situation, calling unwrap to have this thread panic is the correct action to take. Feel free to change this unwrap to an expect with an error message that is meaningful to you.

which can happen if some other thread panicked while holding the lock rather than releasing the lock

可以模拟写一个mutex poisoned例子吗?我想多做一些测试。

还有这段话的意思是要妥善处理错误,而不是随意的去写unwrap,所以上面的代码应该改成如下,对吗。

loop {
    match receiver.lock() {
        Ok(job) => {
            match job.recv() {
                Ok(job) => {
                    job()
                },
                Err(_) => {}
            }
        }
        Err(_) => {
        }
    }
    // let job = receiver.lock().unwrap();
    // let job = job.recv().expect();
    // println!("Worker {} got a job; executing.", id);
    // job();
}

但是这段代码,看着嵌套太多了,有哪些的优化方式? 我尝试了以下几个优化方式,但是好像只有status = 5时,才会产生所期望的线程行为。

fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            let status = 5;

            if status == 1 {
                while let Ok(job) = receiver.lock().unwrap().recv() {
                    Worker::execute_job(id, job);
                }
            }
            else {
                loop {
                    if status == 2 {
                        match receiver.lock() {
                            Ok(job) => {
                                match job.recv() {
                                    Ok(job) => {
                                        Worker::execute_job(id, job);
                                    },
                                    Err(_) => {}
                                }
                            }
                            Err(_) => {
                            }
                        }
                    }
                    else if status == 3 {
                        if let Ok(job) = receiver.lock() {
                            if let Ok(job) = job.recv() {
                                Worker::execute_job(id, job);
                            }
                        }
                    }
                    else if status == 4 {
                        let job =
                            receiver.lock().unwrap();
                        let job = job.recv().expect("received");
                        Worker::execute_job(id, job);
                    }
                    else if status == 5 {
                        let job = receiver.lock().unwrap().recv().unwrap();
                        println!("Worker {} got a job; executing.", id);
                        job();
                    }
                }
            }
        });
        Worker {
            id,
            thread
        }
    }

完整代码如下

main.rs

mod lib;

use std::net::{TcpStream, TcpListener};
use std::io::{Read, Write};
use std::fs;
use std::thread;
use std::time::Duration;
use lib::ThreadPool;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
    let threadPool = ThreadPool::new(4);
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        // thread::spawn(|| {
        //     handler_connection(stream);
        // });
        threadPool.execute(|| {
            handler_connection(stream);
        })
    }
}

fn handler_connection(mut stream: TcpStream) {
    let mut buffer = vec![0; 1024];
    stream.read(&mut buffer).unwrap();
    let header = String::from_utf8_lossy(&buffer);
    // println!("buffer\n{}", header);

    let (file_name, status) = if header.starts_with("GET /sleep HTTP/1.1") {
        thread::sleep(Duration::from_secs(5));
        ("./sleep.html", "200 OK")
    } else if header.starts_with("GET / HTTP/1.1") {
        ("./hello.html", "200 OK")
    } else {
        ("./404.html", "404 NOT FOUND")
    };

    let file_content = fs::read_to_string(file_name).unwrap();
    let response = format!("HTTP/1.1 {}\r\nConnect-length:{}\r\n\r\n{}",
                           status, file_content.len(), file_content);
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

lib.rs

use std::thread;
use std::sync::{mpsc, Mutex, Arc, PoisonError, MutexGuard};
use std::sync::mpsc::Receiver;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    pub fn new(size:usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let locker = Mutex::new(receiver);
        let arc_locker = Arc::new(locker);
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&arc_locker)),);
        }

        ThreadPool {
            workers,
            sender
        }
    }

    pub fn execute<F>(&self, f: F)
        where F: FnOnce() + Send + 'static {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            let status = 5;

            if status == 1 {
                while let Ok(job) = receiver.lock().unwrap().recv() {
                    Worker::execute_job(id, job);
                }
            }
            else {
                loop {
                    if status == 2 {
                        match receiver.lock() {
                            Ok(job) => {
                                match job.recv() {
                                    Ok(job) => {
                                        Worker::execute_job(id, job);
                                    },
                                    Err(_) => {}
                                }
                            }
                            Err(_) => {
                            }
                        }
                    }
                    else if status == 3 {
                        if let Ok(job) = receiver.lock() {
                            if let Ok(job) = job.recv() {
                                Worker::execute_job(id, job);
                            }
                        }
                    }
                    else if status == 4 {
                        let job =
                            receiver.lock().unwrap();
                        let job = job.recv().expect("received");
                        Worker::execute_job(id, job);
                    }
                    else if status == 5 {
                        let job = receiver.lock().unwrap().recv().unwrap();
                        println!("Worker {} got a job; executing.", id);
                        job();
                    }
                }
            }
        });
        Worker {
            id,
            thread
        }
    }

    fn execute_job(id:usize, job: Job) {
        println!("Worker {} got a job; executing.", id);
        job();
    }
}

评论区

写评论
作者 Aaron009 2021-05-02 20:28
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

这个thread::JoinHandle<()>里面的()是什么意思啊?

()这是什么类型?

--
👇
viruscamp: 通常来说 mutex.lock() 碰到的 Err 没法解决, 只能 unwrap() 或 expect() 。

而 receiver.recv() 的 Err 表示所有的 sender 都 drop 了,不可能再收到消息,所以必须处理。一般都用 while let 处理。

你的 status == 1 出的问题是死锁

while let Ok(job) = receiver.lock().unwrap().recv() {
    Worker::execute_job(id, job);
}

我之前碰到过 【已解决】块表达式中临时变量的drop时机问题 这样解决

while let Ok(job) = { let x = receiver.lock().unwrap().recv(); x } {
    Worker::execute_job(id, job);
}
viruscamp 2021-05-02 16:42

通常来说 mutex.lock() 碰到的 Err 没法解决, 只能 unwrap() 或 expect() 。

而 receiver.recv() 的 Err 表示所有的 sender 都 drop 了,不可能再收到消息,所以必须处理。一般都用 while let 处理。

你的 status == 1 出的问题是死锁

while let Ok(job) = receiver.lock().unwrap().recv() {
    Worker::execute_job(id, job);
}

我之前碰到过 【已解决】块表达式中临时变量的drop时机问题 这样解决

while let Ok(job) = { let x = receiver.lock().unwrap().recv(); x } {
    Worker::execute_job(id, job);
}
作者 Aaron009 2021-05-02 16:36

我重新编辑了一下帖子,又出现新的问题,根据优化代码之后,没有出现期望行为,教程中说是因为 MutexGuard 的生命周期长导致比预期更久的持有锁。

我该如何解决这个问题。

--
👇
viruscamp: 照描述,poisoned 的 mutex 很容易做啊

use std::sync::{Arc, Mutex};
use std::thread::spawn;

fn main() {
    let d = Arc::new(Mutex::new(120i8));
    let d1 = d.clone();
    spawn(move || {
        let mut d1l = d1.lock().unwrap();
        *d1l += 120; // 整数溢出 panic
        println!("d = {}", *d1l); // 保证 d1l 生存期够长
    }).join().unwrap_or_default();

    println!("d.is_poisoned() = {}", d.is_poisoned());
    match d.lock() {
        Ok(dl) => println!("d = {}", *dl),
        Err(err) => println!("err = {}", err),
    };    
}
viruscamp 2021-05-02 16:21

照描述,poisoned 的 mutex 很容易做啊

use std::sync::{Arc, Mutex};
use std::thread::spawn;

fn main() {
    let d = Arc::new(Mutex::new(120i8));
    let d1 = d.clone();
    spawn(move || {
        let mut d1l = d1.lock().unwrap();
        *d1l += 120; // 整数溢出 panic
        println!("d = {}", *d1l); // 保证 d1l 生存期够长
    }).join().unwrap_or_default();

    println!("d.is_poisoned() = {}", d.is_poisoned());
    match d.lock() {
        Ok(dl) => println!("d = {}", *dl),
        Err(err) => println!("err = {}", err),
    };    
}
1 共 4 条评论, 1 页