< 返回我的博客

blossom-001 发表于 2025-11-18 20:33

在构建高并发的后端服务时,确保数据的最终一致性是至关重要的。特别是当业务逻辑需要执行 "更新或插入 (Upsert)" 这种复合操作时,传统的 “先查询,后更新” 模式极易陷入并发陷阱。
本文将深入探讨为什么简单的操作会引发竞态条件,并介绍如何在 Rust 的 SeaORM 框架中,使用 版本号(i32 实现一个健壮的 原子化乐观锁 Upsert 流程。


一、乐观锁:不是不锁,而是“巧”锁

数据库的并发控制主要分为悲观锁和乐观锁。

  • 悲观锁(Pessimistic Locking): 假设冲突一定会发生。在读取数据时就对数据行进行锁定,直到事务完成。
  • 乐观锁(Optimistic Locking): 假设冲突很少发生。在整个事务过程中不锁定资源,而是通过检查数据是否被修改来确认。

乐观锁的核心思想是:通过一次原子性的操作来检查并修改数据,而不是依赖两次独立的数据库操作。


二、没有锁的陷阱:丢失更新的竞态条件

让我们以一个 version: i32 字段为例,来看看缺乏原子性操作会导致什么问题。

场景:多人同时更新同一条记录

  1. 查询(事务 A/B): 事务 A 和事务 B 都读取了 ID=1 的记录,其 version 都为 1
  2. 更新(事务 B 提交): 事务 B 完成修改,执行 无版本检查UPDATE 语句,数据库中的 version 变为 2
  3. 更新(事务 A 提交): 事务 A 完成修改,也执行 无版本检查UPDATE 语句。

结果: 事务 B 的业务变更被事务 A 的修改覆盖,导致 丢失更新(Lost Update) 的竞态条件。

乐观锁的解决之道:单次原子操作

要解决这个问题,必须让 “检查旧版本”“设置新值” 成为一个原子操作,即在 UPDATE 语句中加入版本过滤条件:

UPDATE records
SET title = '新标题', version = version + 1
WHERE id = 1 AND version = 1; -- 关键:只有旧版本为 1 时才允许更新

在 SeaORM 中,我们使用 update_many() 配合 filter() 来构造这个原子操作,并通过检查 rows_affected 来判断操作是否成功。


三、Upsert 流程的抉择:先 Update 再 Insert 的优势

实现 Upsert 功能主要有两种策略:“先 Update 再 Insert”“先 Insert 再 Update”。在涉及乐观锁的业务中,“先 Update 再 Insert” 模式是更优的选择。

1. 模式一:先 Update 再 Insert(推荐)

这种模式总是优先处理最常见的情况:更新现有记录

优势分析:

  • 天然支持乐观锁: 乐观锁检查(WHERE version = ?)直接集成在 UPDATE 语句中,利用了数据库的原子性,保证了在单次操作中完成检查和修改。
  • 高效处理更新: 在高并发的更新场景中,大部分操作都是更新。这种模式只需执行一次成功的 UPDATE 就能完成任务,避免了不必要的 INSERT 尝试。

2. 模式二:先 Insert 再 Update

流程: 尝试 INSERT $\to$ 如果失败(主键冲突),执行 UPDATE

劣势分析:

  • 乐观锁实现复杂: 如果 INSERT 失败,转到 UPDATE 时,必须确保 UPDATE 操作是带有乐观锁检查的,这增加了流程的复杂性。
  • 高更新场景效率低: 如果大部分操作是更新,这种模式会强制执行一次注定会失败的 INSERT 操作(抛出主键冲突错误),然后再执行一次 UPDATE,浪费了数据库资源。

总结:选择 “先 Update 再 Insert” 的理由

在处理带有乐观锁的聚合根持久化时,“先 Update 再 Insert” 模式是首选方案。它能够利用 UPDATE 的原子性高效地处理最常见的更新操作,并天然地将乐观锁检查与数据库写操作绑定。


四、SeaORM 中的 Upsert 流程:UPDATE $\to$ FIND $\to$ INSERT

基于 “先 Update 再 Insert” 的策略,我们构建一个清晰的 "原子 UPDATE + FIND + INSERT" 三步流程,以可靠地处理成功更新、并发冲突和成功插入三种情况。

核心实现代码

// 假设 entity.version 是更新后的新版本,expected_old_version = entity.version - 1
async fn save<T: TransactionContext>(
    &self,
    callback: &mut EventSourcedEntity<Callback>,
    txn: &mut T,
) -> Result<(), RepositoryError> {
    let conn = txn.get_connection();
    let entity: &Callback = callback;
    let id = entity.channel.0.clone(); 
    let expected_old_version = entity.version - 1; 

    // 准备 ActiveModel,设置新的 version
    let mut active_model_for_update: ActiveModel = entity.clone().into_active_model();
    active_model_for_update.version = Set(entity.version); 

    // ----------------------------------------------------
    // 第一步:尝试原子 UPDATE(带乐观锁)
    // ----------------------------------------------------
    let res = callback_model::Entity::update_many()
        .set(active_model_for_update)
        .filter(callback_model::Column::Channel.eq(id.clone())) 
        .filter(callback_model::Column::Version.eq(expected_old_version)) // 乐观锁检查
        .exec(conn)
        .await?;

    if res.rows_affected > 0 {
        // 更新成功:影响行数 > 0,说明乐观锁条件满足。
        callback.move_event_to_context(txn);
        return Ok(());
    }

    // ----------------------------------------------------
    // 第二步:UPDATE 失败。使用 FIND 检查记录是否存在(判断是否为并发冲突)
    // ----------------------------------------------------
    if callback_model::Entity::find_by_id(id.clone())
        .one(conn)
        .await?
        .is_some()
    {
        // 记录存在。UPDATE 失败且记录存在,必然是版本不匹配,即并发冲突。
        return Err(RepositoryError::optimistic_lock_error(
            "Optimistic lock conflict: Record exists, but old version did not match."
        ));
    }

    // ----------------------------------------------------
    // 第三步:记录不存在,尝试 INSERT
    // ----------------------------------------------------
    let active_model_for_insert: ActiveModel = entity.clone().into_active_model();
    
    active_model_for_insert.insert(conn).await
        .map_err(|e| {
             // 如果 INSERT 失败,则视为并发冲突(在 FIND 之后被其他事务插入)。
             match e {
                 DbErr::RecordNotInserted | DbErr::Custom(_) => RepositoryError::optimistic_lock_error(
                    "Concurrency conflict: Record inserted after non-existence check."
                 ),
                 _ => e.into(),
            }
        })?;

    callback.move_event_to_context(txn);
    Ok(())
}

结论:告别竞态,拥抱原子性

通过本文的分析和实践,我们可以得出以下关键结论:

  1. 乐观锁是高并发的基石: 放弃“先查后改”的传统模式,将版本检查数据修改集成到一次原子性的 UPDATE 操作中,是避免丢失更新等竞态条件的根本方法。
  2. 选择正确的 Upsert 策略: “先 Update 再 Insert” 模式凭借其对乐观锁的天然支持和对更新操作的高效处理,成为处理聚合根持久化的首选。
  3. 利用数据库的原子性: 无论是通过检查 rows_affected,还是依赖主键约束错误来区分更新失败的原因,都是在充分利用数据库底层机制来确保数据一致性。

在您的 Rust DDD/CQRS 架构中,将这种原子化逻辑封装进仓储(Repository)层的 save() 方法中,是确保数据完整性和系统高可用性的关键。

评论区

写评论

还没有评论

1 共 0 条评论, 1 页