有如下需求(伪代码),计算并写入sqlite,利用的是库rusqlite:
fn main() {
// 开启事务
let transction = ...;
let mut pre_data1 = init1;
let mut pre_data2 = init2;
for i in 0..2000 {
//计算数据
let cur_data1 = calculator1(&pre_data1, &pre_data2);
let cur_data2 = calculator2(&pre_data1, &pre_data2);
//写入事务
to_sqlite(transction, &pre_data1, &pre_data2);
pre_data1 = cur_data1;
pre_data2 = cur_data2;
}
// 提交事务
transction.commit();
}
fn calculator1(...) {
// 计算任务
}
fn calculator2(...) {
// 计算任务
}
fn to_sqlite(...) {
execute(...)
}
rusqlite本身函数全部都是“同步”的且里面包含有操作:复制不可变借用指向的结构体里面包含的数据进一个vec,序列化vec为json_string,写入事务。to_sqlite
所需要的时间大概是计算任务的10%到50%不等。请问这样该用哪种办法实现好呢?
我是觉得这里显然计算任务和写入事务是可以同时进行的,但是抓不住实现的要点,求教各位有经验的大佬这类型的任务用什么思路来完成?
PS:我曾经尝试过rayon::join,但是跳出了一大串的错误说是很多都没impl send,然后to_sqlite
提示&mut stmt
不是thread safety的。
交个作业:
这里之前忘记说了pre_data
实际上是一个&mut CsVec
,也就是说这个函数里实际上持有的是某个变量的可变指针,所以不能直接传送到副线程里,否则会有生命周期错误,这里也不能直接给'static
的生命周期,更不能深拷贝因为性能很差,所以就有点麻烦了,这里我尝试成功了两种办法;
- 之前说了,
to_sqlite
由三个部分组成:&CsVec
转化为Vec
,然后Vec
转化为json_string
,然后写入事务。这里,只把写入事务放入副线程。好处是几乎不用改多少源代码,坏处是前两个可能也是颇为耗时的步骤也需要在主线处理。流程如下:
fn main() {
// 开启事务 (补充一下,实际上这里是需要得到handle,才能在最后使用join防止主线程早于副线程退出)
let (tx, rx) = std::sync::mpsc::channel();
let handle = std::thread::spawn(move || {
let transaction = ...;
while let Ok((pre_data1, pre_data2)) = rx.recv() {
transaction.execute(...);
}
transction.commit();
});
let mut pre_data1 = init1;
let mut pre_data2 = init2;
sub_fn(&transaction, &mut pre_data1, &mut pre_data2)
handle.join().unwrap();
}
fn sub_fn(tx, &mut pre_data1, &mut pre_data2) {
for i in 0..2000 {
//计算数据
let cur_data1 = calculator1(&pre_data1, &pre_data2);
let cur_data2 = calculator2(&pre_data1, &pre_data2);
//先转为json再发送入副线程进行execute
let params = to_json(tpre_data1_string, pre_data2_string);
tx.send(params).unwrap();
pre_data1 = cur_data1;
pre_data2 = cur_data2;
}
}
fn calculator1(...) {
// 计算任务
}
fn calculator2(...) {
// 计算任务
}
fn to_json(...) {
pre_data -> Vec<T>
Vec<T> -> json
}
- 比较麻烦的方法,需要从底层改变变量的形式,来传Arc。
fn main() {
// 开启事务 (补充一下,实际上这里是需要得到handle,才能在最后使用join防止主线程早于副线程退出)
let (tx, rx) = std::sync::mpsc::channel();
let handle = std::thread::spawn(move || {
let transaction = ...;
while let Ok((pre_data1, pre_data2)) = rx.recv() {
// 函数完成后,自动丢弃read lock
to_sqlite(transaction, pre_data1, pre_data2);
}
transction.commit();
});
// 使用RwLock来保证并发读取,并用Arc包裹以跨线程传输
let pre_data1: Arc<RwLock<CsVec>> = init1;
let pre_data2: Arc<RwLock<CsVec>> = init2;
sub_fn(&transaction, pre_data1.clone(), pre_data2.clone())
handle.join().unwrap();
}
fn sub_fn(tx, pre_data1, pre_data2) {
for i in 0..2000 {
// 计算数据
// 不具体写了,挺麻烦,需要先取得read lock
let cur_data1 = calculator1(...);
let cur_data2 = calculator2(...);
// 传入整个Arc
tx.send(pre_data1.clone(), pre_data2.clone()).unwrap();
// 丢弃 read lock
drop(rlock1);
drop(rlock2);
// 当所有read lock都drop后,wlock可以启用,开始往RwLock内部写入数据。
pre_data1 = wlock -> cur_data1;
pre_data2 = wlock -> cur_data2;
}
}
fn calculator1(...) {
// 计算任务
}
fn calculator2(...) {
// 计算任务
}
fn to_sqlite(...) {
pre_data -> Vec<T>
Vec<T> -> json
execute
}
实际上我更偏爱第一种,但是第二种的可拓展性貌似更好些。问题是加锁解锁貌似也挺耗时的。因为数据集的原因,好像第二种也没有明显强于第一种,因为序列化和展开稀疏矩阵明显耗时很短的样子。
评论区
写评论你说的对,是后者。我使用的是sprs库,这个库的所有计算都是返回一个新的
CsVec
向量。实际上貌似ndarray
和nalgbera
都是生成新的值,因为矩阵 * 向量 = 新向量
,在原地替换向量除了省内存但意义不大的样子,还有可能拖累计算速度。感谢你的新点子,极有道理而且容易实现。我花了十分钟就改好了。稍微测试了下,效果拔群,单线程大概是3.5s,arc+rwlock大概是2.7s,直接传值大概提升至2.5s。学习到了,感谢!
👇
johnmave126: 这个做法听起来有点怪,我问一下啊,你的
calculator1
和calculator2
返回的类型是&CsVec
还是CsVec
. 我估计是后者吧,不然感觉borrow rule有问题,计算cur_data2
好像会有多个mutable borrow.假如我的估计是正确的,那我估计
main
里的pre_data1
和pre_data2
都是值而不是引用,那你可以考虑sub_fn
不要传&mut
而是直接传值move进去。然后你就注意到,在你send
以后,pre_data1
和pre_data2
都不会再次使用,那你就可以直接move走整个值传过channel,只要CsVec
支持Send
就行。大概会变成这样
这个做法听起来有点怪,我问一下啊,你的
calculator1
和calculator2
返回的类型是&CsVec
还是CsVec
. 我估计是后者吧,不然感觉borrow rule有问题,计算cur_data2
好像会有多个mutable borrow.假如我的估计是正确的,那我估计
main
里的pre_data1
和pre_data2
都是值而不是引用,那你可以考虑sub_fn
不要传&mut
而是直接传值move进去。然后你就注意到,在你send
以后,pre_data1
和pre_data2
都不会再次使用,那你就可以直接move走整个值传过channel,只要CsVec
支持Send
就行。大概会变成这样
听着很有道理啊,感谢!听君一席话,胜读百页书啊,哈哈。今晚就试试看。
--
👇
johnmave126: channel倒是不一定要用std的,有些spsc实现可能更加高效。
--
👇
johnmave126: 感觉都不用,你直接开条别的线程专门用来提交sqlite,比如这样
channel倒是不一定要用std的,有些spsc实现可能更加高效。
--
👇
johnmave126: 感觉都不用,你直接开条别的线程专门用来提交sqlite,比如这样
感觉都不用,你直接开条别的线程专门用来提交sqlite,比如这样