< 返回版块

sunnyregion 发表于 2021-09-01 08:56

各位大佬:

昨天腾讯视频聊的比较开心,留了一个尾巴。我现在把我的困惑放出来,大家尽情拍砖。

这个是我写的文章,文章分了两章 :

6.2 多线程——channel

6.3 多线程——future

评论区

写评论
johnmave126 2021-09-02 23:04

当然可以,发上来就是为了分享

作者 sunnyregion 2021-09-02 14:27

感谢 johnmave126

我把您的程序用在我的这个教程里面了,也写了出处。

地址 channel

如过您反对的话,我可以取消。

希望您能够同意我的使用,谢谢!

作者 sunnyregion 2021-09-02 13:55

感谢,感谢,这个吊

--
👇
johnmave126: ```rust use futures::stream::{self, StreamExt}; use std::iter; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

async fn sieve() { let (tx, rx): (UnboundedSender, UnboundedReceiver) = unbounded_channel(); tokio::spawn(async move { for i in 2..20000 { tx.send(i).ok(); } }); stream::iter(iter::repeat(())) .scan(rx, |rx, _| { let (tx2, mut rx2) = unbounded_channel(); std::mem::swap(rx, &mut rx2); async { let prime = rx2.recv().await?; println!("{}", prime); tokio::spawn(async move { while let Some(i) = rx2.recv().await { if i % prime != 0 { tx2.send(i).ok(); } } }); Some(()) } }) .all(|_| std::future::ready(true)) .await; }

#[tokio::main] async fn main() { sieve().await; }

结果是

time target/release/rust_prime real 0m0.110s user 0m0.314s sys 0m0.135s


稍微改了一下go的方便跑起来
```go
package main

import (
    "fmt"
)

func generate(ch chan<- int) {
    for i := 2; i < 20000 ; i++ {
        ch <- i // Send 'i' to channel 'ch'.
    }
    close(ch)
}

func filter(src <-chan int, dst chan<- int, prime int) {
    for i := range src { // Loop over values received from 'src'.
        if i % prime != 0 {
            dst <- i // Send 'i' to channel 'dst'.
        }
    }
    close(dst)
}

func sieve() {
    ch := make(chan int) // Create a new channel.
    go generate(ch)      // Start generate() as a subprocess.
    for {
        prime, more := <-ch
        if !more {
            break
        }
        fmt.Print(prime, "\n")
        ch1 := make(chan int)
        go filter(ch, ch1, prime)
        ch = ch1
    }
}

func main() {
    sieve()
}

结果是

> time ./go
real    0m0.146s
user    0m0.891s
sys     0m0.037s

看起来tokio做scheduler来切换效率也不差的

johnmave126 2021-09-02 13:09

不用go这套思路,我换了个思路

筛法本质上其实就是把已知素数表从左到右试一遍

而且只需要试平方根以内的素数就可以了

所以可以先生成10以内的素数表,然后用10以内的素数表生成10-100的素数表,拼起来生成100-10000以内的素数表,以此类推

注意到每次生成的时候,100-10000以内的数都用着同一张已知素数表,所以可以平行化,这里套一个rayon的par_iter来处理

use rayon::prelude::*;

const N: i32 = 2000000;

fn main() {
    let mut primes = Vec::new();
    let mut cur = 10;
    // bootstrap
    for i in 2..cur {
        if primes.iter().all(|p| i % p != 0) {
            primes.push(i);
        }
    }
    while cur < N {
        let next = std::cmp::min(cur * cur, N);
        let mut addition: Vec<_> = (cur..next)
            .into_par_iter()
            .filter(|x| {
                for p in primes.iter() {
                    if x % p == 0 {
                        return false;
                    }
                    if x / p < *p {
                        break;
                    }
                }
                true
            })
            .collect();
        primes.append(&mut addition);
        cur = next;
    }
    for p in primes {
        println!("{}", p);
    }
}

试着跑了一下200000

❯ time target/release/rust_prime_rayon 

real    0m0.018s
user    0m0.044s
sys     0m0.004s

这个的效果真是太好了...

再加一个0

❯ time target/release/rust_prime_rayon 

real    0m0.060s
user    0m0.323s
sys     0m0.017s
johnmave126 2021-09-02 11:47

加了个0跑了一下差距更大

❯ time ./go

real    0m6.529s
user    0m46.325s
sys     0m0.056s
❯ time target/release/rust_prime 

real    0m3.193s
user    0m19.992s
sys     0m4.072s

tokio,行

johnmave126 2021-09-02 11:34
use futures::stream::{self, StreamExt};
use std::iter;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

async fn sieve() {
    let (tx, rx): (UnboundedSender<i32>, UnboundedReceiver<i32>) = unbounded_channel();
    tokio::spawn(async move {
        for i in 2..20000 {
            tx.send(i).ok();
        }
    });
    stream::iter(iter::repeat(()))
        .scan(rx, |rx, _| {
            let (tx2, mut rx2) = unbounded_channel();
            std::mem::swap(rx, &mut rx2);
            async {
                let prime = rx2.recv().await?;
                println!("{}", prime);
                tokio::spawn(async move {
                    while let Some(i) = rx2.recv().await {
                        if i % prime != 0 {
                            tx2.send(i).ok();
                        }
                    }
                });
                Some(())
            }
        })
        .all(|_| std::future::ready(true))
        .await;
}

#[tokio::main]
async fn main() {
    sieve().await;
}

结果是

> time target/release/rust_prime
real    0m0.110s
user    0m0.314s
sys     0m0.135s

稍微改了一下go的方便跑起来

package main

import (
    "fmt"
)

func generate(ch chan<- int) {
    for i := 2; i < 20000 ; i++ {
        ch <- i // Send 'i' to channel 'ch'.
    }
    close(ch)
}

func filter(src <-chan int, dst chan<- int, prime int) {
    for i := range src { // Loop over values received from 'src'.
        if i % prime != 0 {
            dst <- i // Send 'i' to channel 'dst'.
        }
    }
    close(dst)
}

func sieve() {
    ch := make(chan int) // Create a new channel.
    go generate(ch)      // Start generate() as a subprocess.
    for {
        prime, more := <-ch
        if !more {
            break
        }
        fmt.Print(prime, "\n")
        ch1 := make(chan int)
        go filter(ch, ch1, prime)
        ch = ch1
    }
}

func main() {
    sieve()
}

结果是

> time ./go
real    0m0.146s
user    0m0.891s
sys     0m0.037s

看起来tokio做scheduler来切换效率也不差的

作者 sunnyregion 2021-09-02 10:39

有道理,本身这个go程序就是Rob Pike为了显示go的高效做的例子,可能本身就符合go的哲学。

我就是想找到一条接近的方式,我就是用这个例子来学习rust的多线程。

--
👇
Grobycn: 就这个算法本身,并行计算不能提高效率,反而会因为有额外的切换开销,比顺序执行更慢。

然后对比一下 rust future 和 go channel。

rust 的并行效率取决于采用的任务调度机制,比如换成 tokio,效率可能会提升一点。

go channel 的调度比较明确,channel内部会维护一组任务id,传入数据时,如果有接收数据的任务,直接转移到接受数据的任务。

所以 go channel 接近顺序执行,效率高一点是正常的。

Grobycn 2021-09-02 09:56

就这个算法本身,并行计算不能提高效率,反而会因为有额外的切换开销,比顺序执行更慢。

然后对比一下 rust future 和 go channel。

rust 的并行效率取决于采用的任务调度机制,比如换成 tokio,效率可能会提升一点。

go channel 的调度比较明确,channel内部会维护一组任务id,传入数据时,如果有接收数据的任务,直接转移到接受数据的任务。

所以 go channel 接近顺序执行,效率高一点是正常的。

作者 sunnyregion 2021-09-02 09:07

我计算的是2万,你的是2千。时间还是慢

--
👇
vimcdoe: ```rust use rayon::prelude::*; fn main() { let m = 2000i32; let mut n = 2i32; let mut res = vec![]; let mut tmp: Vec = (n..m).into_iter().collect(); while n < m { res.push(n); tmp = tmp .into_par_iter() .filter(|x| x % n != 0) .collect::<Vec>(); if tmp.get(1).is_some() { n = tmp[0]; } else { res.push(tmp[0]); break; } } // dbg!(&res); // dbg!(res.len()); }

real    0m0.101s                              
user    0m0.097s                              
sys     0m0.016s  
时间可以,就算法不知bug没
vimcdoe 2021-09-02 03:59
use rayon::prelude::*;
fn main() {
    let m = 2000i32;
    let mut n = 2i32;
    let mut res = vec![];
    let mut tmp: Vec<i32> = (n..m).into_iter().collect();
    while n < m {
        res.push(n);
        tmp = tmp
            .into_par_iter()
            .filter(|x| x % n != 0)
            .collect::<Vec<i32>>();
        if tmp.get(1).is_some() {
            n = tmp[0];
        } else {
            res.push(tmp[0]);
            break;
        }
    }
   // dbg!(&res);
   // dbg!(res.len());
}

real 0m0.101s
user 0m0.097s
sys 0m0.016s
时间可以,就算法不知bug没

作者 sunnyregion 2021-09-01 16:12

你可以改一下,然后贴出来

--
👇
sunli829: channel慢,比go慢很多,rust程序不要这么写,尽量减少channel通讯

作者 sunnyregion 2021-09-01 16:11

使用mpmc了

--
👇
c5soft: 亲,把程序再优化一下,用mpmc或者别的crate重写一下用例。Rust居然比Go跑得慢,锈粉们不认可。

sunli829 2021-09-01 14:28

频繁await造成的性能损失不可忽视

sunli829 2021-09-01 14:27

channel慢,比go慢很多,rust程序不要这么写,尽量减少channel通讯

c5soft 2021-09-01 13:04

亲,把程序再优化一下,用mpmc或者别的crate重写一下用例。Rust居然比Go跑得慢,锈粉们不认可。

rustdesk 2021-09-01 11:02

都是我认识的

1 共 16 条评论, 1 页