在构建高并发的后端服务时,确保数据的最终一致性是至关重要的。特别是当业务逻辑需要执行 "更新或插入 (Upsert)" 这种复合操作时,传统的 “先查询,后更新” 模式极易陷入并发陷阱。
本文将深入探讨为什么简单的操作会引发竞态条件,并介绍如何在 Rust 的 SeaORM 框架中,使用 版本号(i32) 实现一个健壮的 原子化乐观锁 Upsert 流程。
一、乐观锁:不是不锁,而是“巧”锁
数据库的并发控制主要分为悲观锁和乐观锁。
- 悲观锁(Pessimistic Locking): 假设冲突一定会发生。在读取数据时就对数据行进行锁定,直到事务完成。
- 乐观锁(Optimistic Locking): 假设冲突很少发生。在整个事务过程中不锁定资源,而是通过检查数据是否被修改来确认。
乐观锁的核心思想是:通过一次原子性的操作来检查并修改数据,而不是依赖两次独立的数据库操作。
二、没有锁的陷阱:丢失更新的竞态条件
让我们以一个 version: i32 字段为例,来看看缺乏原子性操作会导致什么问题。
场景:多人同时更新同一条记录
- 查询(事务 A/B): 事务 A 和事务 B 都读取了 ID=1 的记录,其
version都为1。 - 更新(事务 B 提交): 事务 B 完成修改,执行 无版本检查 的
UPDATE语句,数据库中的version变为2。 - 更新(事务 A 提交): 事务 A 完成修改,也执行 无版本检查 的
UPDATE语句。
结果: 事务 B 的业务变更被事务 A 的修改覆盖,导致 丢失更新(Lost Update) 的竞态条件。
乐观锁的解决之道:单次原子操作
要解决这个问题,必须让 “检查旧版本” 和 “设置新值” 成为一个原子操作,即在 UPDATE 语句中加入版本过滤条件:
UPDATE records
SET title = '新标题', version = version + 1
WHERE id = 1 AND version = 1; -- 关键:只有旧版本为 1 时才允许更新
在 SeaORM 中,我们使用 update_many() 配合 filter() 来构造这个原子操作,并通过检查 rows_affected 来判断操作是否成功。
三、Upsert 流程的抉择:先 Update 再 Insert 的优势
实现 Upsert 功能主要有两种策略:“先 Update 再 Insert” 和 “先 Insert 再 Update”。在涉及乐观锁的业务中,“先 Update 再 Insert” 模式是更优的选择。
1. 模式一:先 Update 再 Insert(推荐)
这种模式总是优先处理最常见的情况:更新现有记录。
优势分析:
- 天然支持乐观锁: 乐观锁检查(
WHERE version = ?)直接集成在UPDATE语句中,利用了数据库的原子性,保证了在单次操作中完成检查和修改。 - 高效处理更新: 在高并发的更新场景中,大部分操作都是更新。这种模式只需执行一次成功的
UPDATE就能完成任务,避免了不必要的INSERT尝试。
2. 模式二:先 Insert 再 Update
流程: 尝试 INSERT $\to$ 如果失败(主键冲突),执行 UPDATE。
劣势分析:
- 乐观锁实现复杂: 如果
INSERT失败,转到UPDATE时,必须确保UPDATE操作是带有乐观锁检查的,这增加了流程的复杂性。 - 高更新场景效率低: 如果大部分操作是更新,这种模式会强制执行一次注定会失败的
INSERT操作(抛出主键冲突错误),然后再执行一次UPDATE,浪费了数据库资源。
总结:选择 “先 Update 再 Insert” 的理由
在处理带有乐观锁的聚合根持久化时,“先 Update 再 Insert” 模式是首选方案。它能够利用 UPDATE 的原子性高效地处理最常见的更新操作,并天然地将乐观锁检查与数据库写操作绑定。
四、SeaORM 中的 Upsert 流程:UPDATE $\to$ FIND $\to$ INSERT
基于 “先 Update 再 Insert” 的策略,我们构建一个清晰的 "原子 UPDATE + FIND + INSERT" 三步流程,以可靠地处理成功更新、并发冲突和成功插入三种情况。
核心实现代码
// 假设 entity.version 是更新后的新版本,expected_old_version = entity.version - 1
async fn save<T: TransactionContext>(
&self,
callback: &mut EventSourcedEntity<Callback>,
txn: &mut T,
) -> Result<(), RepositoryError> {
let conn = txn.get_connection();
let entity: &Callback = callback;
let id = entity.channel.0.clone();
let expected_old_version = entity.version - 1;
// 准备 ActiveModel,设置新的 version
let mut active_model_for_update: ActiveModel = entity.clone().into_active_model();
active_model_for_update.version = Set(entity.version);
// ----------------------------------------------------
// 第一步:尝试原子 UPDATE(带乐观锁)
// ----------------------------------------------------
let res = callback_model::Entity::update_many()
.set(active_model_for_update)
.filter(callback_model::Column::Channel.eq(id.clone()))
.filter(callback_model::Column::Version.eq(expected_old_version)) // 乐观锁检查
.exec(conn)
.await?;
if res.rows_affected > 0 {
// 更新成功:影响行数 > 0,说明乐观锁条件满足。
callback.move_event_to_context(txn);
return Ok(());
}
// ----------------------------------------------------
// 第二步:UPDATE 失败。使用 FIND 检查记录是否存在(判断是否为并发冲突)
// ----------------------------------------------------
if callback_model::Entity::find_by_id(id.clone())
.one(conn)
.await?
.is_some()
{
// 记录存在。UPDATE 失败且记录存在,必然是版本不匹配,即并发冲突。
return Err(RepositoryError::optimistic_lock_error(
"Optimistic lock conflict: Record exists, but old version did not match."
));
}
// ----------------------------------------------------
// 第三步:记录不存在,尝试 INSERT
// ----------------------------------------------------
let active_model_for_insert: ActiveModel = entity.clone().into_active_model();
active_model_for_insert.insert(conn).await
.map_err(|e| {
// 如果 INSERT 失败,则视为并发冲突(在 FIND 之后被其他事务插入)。
match e {
DbErr::RecordNotInserted | DbErr::Custom(_) => RepositoryError::optimistic_lock_error(
"Concurrency conflict: Record inserted after non-existence check."
),
_ => e.into(),
}
})?;
callback.move_event_to_context(txn);
Ok(())
}
结论:告别竞态,拥抱原子性
通过本文的分析和实践,我们可以得出以下关键结论:
- 乐观锁是高并发的基石: 放弃“先查后改”的传统模式,将版本检查与数据修改集成到一次原子性的
UPDATE操作中,是避免丢失更新等竞态条件的根本方法。 - 选择正确的 Upsert 策略: “先 Update 再 Insert” 模式凭借其对乐观锁的天然支持和对更新操作的高效处理,成为处理聚合根持久化的首选。
- 利用数据库的原子性: 无论是通过检查
rows_affected,还是依赖主键约束错误来区分更新失败的原因,都是在充分利用数据库底层机制来确保数据一致性。
在您的 Rust DDD/CQRS 架构中,将这种原子化逻辑封装进仓储(Repository)层的 save() 方法中,是确保数据完整性和系统高可用性的关键。
评论区
写评论还没有评论