< 返回版块

eweca-d 发表于 2021-03-30 17:36

有如下需求(伪代码),计算并写入sqlite,利用的是库rusqlite:

fn main() {
    // 开启事务
    let transction = ...;
    let mut pre_data1 = init1;
    let mut pre_data2 = init2;
    for i in 0..2000 {
        //计算数据
        let cur_data1 = calculator1(&pre_data1, &pre_data2);
        let cur_data2 = calculator2(&pre_data1, &pre_data2);
        
        //写入事务
        to_sqlite(transction, &pre_data1, &pre_data2);

        pre_data1 = cur_data1;
        pre_data2 = cur_data2;
    }

    // 提交事务
    transction.commit();
}

fn calculator1(...) {
    // 计算任务
}

fn calculator2(...) {
    // 计算任务
}

fn to_sqlite(...) {
    execute(...)
}

rusqlite本身函数全部都是“同步”的且里面包含有操作:复制不可变借用指向的结构体里面包含的数据进一个vec,序列化vec为json_string,写入事务。to_sqlite所需要的时间大概是计算任务的10%到50%不等。请问这样该用哪种办法实现好呢?

我是觉得这里显然计算任务和写入事务是可以同时进行的,但是抓不住实现的要点,求教各位有经验的大佬这类型的任务用什么思路来完成?

PS:我曾经尝试过rayon::join,但是跳出了一大串的错误说是很多都没impl send,然后to_sqlite提示&mut stmt不是thread safety的。


交个作业:

这里之前忘记说了pre_data实际上是一个&mut CsVec,也就是说这个函数里实际上持有的是某个变量的可变指针,所以不能直接传送到副线程里,否则会有生命周期错误,这里也不能直接给'static的生命周期,更不能深拷贝因为性能很差,所以就有点麻烦了,这里我尝试成功了两种办法;

  1. 之前说了,to_sqlite由三个部分组成:&CsVec转化为Vec,然后Vec转化为json_string,然后写入事务。这里,只把写入事务放入副线程。好处是几乎不用改多少源代码,坏处是前两个可能也是颇为耗时的步骤也需要在主线处理。流程如下:
fn main() {
    // 开启事务 (补充一下,实际上这里是需要得到handle,才能在最后使用join防止主线程早于副线程退出)
    let (tx, rx) = std::sync::mpsc::channel();
    let handle = std::thread::spawn(move || {
        let transaction = ...;
        while let Ok((pre_data1, pre_data2)) = rx.recv() {
            transaction.execute(...);
        }
        transction.commit();
    });

    let mut pre_data1 = init1;
    let mut pre_data2 = init2;

    sub_fn(&transaction, &mut pre_data1, &mut pre_data2)

    handle.join().unwrap();
}

fn sub_fn(tx, &mut pre_data1, &mut pre_data2) {
    for i in 0..2000 {
        //计算数据
        let cur_data1 = calculator1(&pre_data1, &pre_data2);
        let cur_data2 = calculator2(&pre_data1, &pre_data2);
        
        //先转为json再发送入副线程进行execute
        let params = to_json(tpre_data1_string, pre_data2_string);
        tx.send(params).unwrap();

        pre_data1 = cur_data1;
        pre_data2 = cur_data2;
    }
}

fn calculator1(...) {
    // 计算任务
}

fn calculator2(...) {
    // 计算任务
}

fn to_json(...) {
    pre_data -> Vec<T>
    Vec<T> -> json
}
  1. 比较麻烦的方法,需要从底层改变变量的形式,来传Arc。
fn main() {
    // 开启事务 (补充一下,实际上这里是需要得到handle,才能在最后使用join防止主线程早于副线程退出)
    let (tx, rx) = std::sync::mpsc::channel();
    let handle = std::thread::spawn(move || {
        let transaction = ...;
        while let Ok((pre_data1, pre_data2)) = rx.recv() {
            // 函数完成后,自动丢弃read lock
            to_sqlite(transaction, pre_data1, pre_data2);
        }
        transction.commit();
    });

    // 使用RwLock来保证并发读取,并用Arc包裹以跨线程传输
    let pre_data1: Arc<RwLock<CsVec>> = init1;
    let pre_data2:  Arc<RwLock<CsVec>> = init2;

    sub_fn(&transaction, pre_data1.clone(), pre_data2.clone())

    handle.join().unwrap();
}

fn sub_fn(tx, pre_data1, pre_data2) {
    for i in 0..2000 {
        // 计算数据
        // 不具体写了,挺麻烦,需要先取得read lock
        let cur_data1 = calculator1(...);
        let cur_data2 = calculator2(...);
        
        // 传入整个Arc
        tx.send(pre_data1.clone(), pre_data2.clone()).unwrap();

        // 丢弃 read lock
        drop(rlock1);
        drop(rlock2);

        // 当所有read lock都drop后,wlock可以启用,开始往RwLock内部写入数据。
        pre_data1 = wlock -> cur_data1;
        pre_data2 = wlock -> cur_data2;
    }
}

fn calculator1(...) {
    // 计算任务
}

fn calculator2(...) {
    // 计算任务
}

fn to_sqlite(...) {
    pre_data -> Vec<T>
    Vec<T> -> json
    execute
}

实际上我更偏爱第一种,但是第二种的可拓展性貌似更好些。问题是加锁解锁貌似也挺耗时的。因为数据集的原因,好像第二种也没有明显强于第一种,因为序列化和展开稀疏矩阵明显耗时很短的样子。

评论区

写评论
作者 eweca-d 2021-04-01 13:06

你说的对,是后者。我使用的是sprs库,这个库的所有计算都是返回一个新的CsVec向量。实际上貌似ndarraynalgbera都是生成新的值,因为矩阵 * 向量 = 新向量,在原地替换向量除了省内存但意义不大的样子,还有可能拖累计算速度。

感谢你的新点子,极有道理而且容易实现。我花了十分钟就改好了。稍微测试了下,效果拔群,单线程大概是3.5s,arc+rwlock大概是2.7s,直接传值大概提升至2.5s。学习到了,感谢!

👇
johnmave126: 这个做法听起来有点怪,我问一下啊,你的calculator1calculator2返回的类型是&CsVec还是CsVec. 我估计是后者吧,不然感觉borrow rule有问题,计算cur_data2好像会有多个mutable borrow.

假如我的估计是正确的,那我估计main里的pre_data1pre_data2都是值而不是引用,那你可以考虑sub_fn不要传&mut而是直接传值move进去。然后你就注意到,在你send以后,pre_data1pre_data2都不会再次使用,那你就可以直接move走整个值传过channel,只要CsVec支持Send就行。

大概会变成这样

fn main() {
    let (tx, rx) = std::sync::mpsc::channel();
    let handle = std::thread::spawn(move || {
        let transaction = ...;
        while let Ok((pre_data1, pre_data2)) = rx.recv() {
            // to json etc...
            transaction.execute(...);
        }
        transction.commit();
    });

    let pre_data1 = init1;
    let pre_data2 = init2;

    // 假设你之后还用得到结果
    let (pre_data1, pre_data2) = sub_fn(&transaction, pre_data1, pre_data2)

    handle.join().unwrap();
}

fn sub_fn(tx, mut pre_data1, mut pre_data2) {
    for i in 0..2000 {
        let cur_data1 = calculator1(&pre_data1, &pre_data2);
        let cur_data2 = calculator2(&pre_data1, &pre_data2);
        
        tx.send((pre_data1, pre_data2)).unwrap();

        pre_data1 = cur_data1;
        pre_data2 = cur_data2;
    }
    (pre_data1, pre_data2)
}
johnmave126 2021-04-01 02:57

这个做法听起来有点怪,我问一下啊,你的calculator1calculator2返回的类型是&CsVec还是CsVec. 我估计是后者吧,不然感觉borrow rule有问题,计算cur_data2好像会有多个mutable borrow.

假如我的估计是正确的,那我估计main里的pre_data1pre_data2都是值而不是引用,那你可以考虑sub_fn不要传&mut而是直接传值move进去。然后你就注意到,在你send以后,pre_data1pre_data2都不会再次使用,那你就可以直接move走整个值传过channel,只要CsVec支持Send就行。

大概会变成这样

fn main() {
    let (tx, rx) = std::sync::mpsc::channel();
    let handle = std::thread::spawn(move || {
        let transaction = ...;
        while let Ok((pre_data1, pre_data2)) = rx.recv() {
            // to json etc...
            transaction.execute(...);
        }
        transction.commit();
    });

    let pre_data1 = init1;
    let pre_data2 = init2;

    // 假设你之后还用得到结果
    let (pre_data1, pre_data2) = sub_fn(&transaction, pre_data1, pre_data2)

    handle.join().unwrap();
}

fn sub_fn(tx, mut pre_data1, mut pre_data2) {
    for i in 0..2000 {
        let cur_data1 = calculator1(&pre_data1, &pre_data2);
        let cur_data2 = calculator2(&pre_data1, &pre_data2);
        
        tx.send((pre_data1, pre_data2)).unwrap();

        pre_data1 = cur_data1;
        pre_data2 = cur_data2;
    }
    (pre_data1, pre_data2)
}
作者 eweca-d 2021-03-31 12:08

听着很有道理啊,感谢!听君一席话,胜读百页书啊,哈哈。今晚就试试看。

--
👇
johnmave126: channel倒是不一定要用std的,有些spsc实现可能更加高效。

--
👇
johnmave126: 感觉都不用,你直接开条别的线程专门用来提交sqlite,比如这样

johnmave126 2021-03-31 02:25

channel倒是不一定要用std的,有些spsc实现可能更加高效。

--
👇
johnmave126: 感觉都不用,你直接开条别的线程专门用来提交sqlite,比如这样

johnmave126 2021-03-31 02:23

感觉都不用,你直接开条别的线程专门用来提交sqlite,比如这样

let (tx, rx) = std::sync::mpsc::channel();

std::thread::spawn(move || {
    let transaction = ...;
    while let Ok((pre_data1, pre_data2)) = rx.recv() {
        to_sqlite(transaction, pre_data1, pre_data2);
    }
    transction.commit();
});

let mut pre_data1 = init1;
let mut pre_data2 = init2;
for i in 0..2000 {
    let cur_data1 = calculator1(&pre_data1, &pre_data2);
    let cur_data2 = calculator2(&pre_data1, &pre_data2);

    tx.send((pre_data1, pre_data2)).expect("failed to send");

    pre_data1 = cur_data1;
    pre_data2 = cur_data2;
}
drop(tx);
1 共 5 条评论, 1 页