< 返回我的博客

爱国的张浩予 发表于 2025-06-02 15:09

Tags:lock,mutex,atomic,thread,task,spinlock

【无锁编程】之【原子内存排序】

引题

无锁编程的“无锁”并不是放弃多任务处理中的“线程同步”,而是允许 @Rustacean 绕开操作系统线程锁,直接依赖硬件的有序原子操作,等效地完成线程同步任务。

线程同步

线程同步是确保多任务系统有序运行的重要机制,但它的实施技术手段却有多种选择。其包括但不仅限于

  • [传统]操作系统(内核态)线程锁
    • 它是RustCpp标准库中【互斥锁】和【读写锁】的核心支持。
    • 其也是【线程同步】宏大故事叙事的起点。
  • [硬核]硬件有序原子操作(Ordered Atomic Operations)对
    • 该技术路线既依赖单个原子操作的“原子性”,“排他性”和“顺序性”,又以原子操作指令配对使用的方式同步线程。
    • 它是线程锁面向硬件的性能优先替代方案,学习门槛有点儿高。
  • [超线程]由各款虚拟机 App 独立设计实施的操作系统(用户态)线程锁
    • 比如,来自大名鼎鼎JVM的“绿色线程”就几乎(在应用层)复刻了操作系统线程调度机制,和模仿操作系统API暴露程序接口给JAVA程序员。
    • 它也是线程锁面向软件的易用性优先替代方案,因为程序员几乎无需付出额外的学习成本。
  • [协程]异步运行时是以操作系统“线程池+I/O多路复用机制”为反应器Reactor,以异步任务调度执行模块为执行器Executor,以最外层异步任务Future协程Task,以协程Task代替线程Thread,以【协程挂起】代替【线程阻塞】,以【无阻塞编程】代替【无锁编程】的创新。
    • 这样的异步运行时有来自Rust社区的smol cratetokio crate
    • 该概念创新几乎隐藏了线程与线程锁的概念,和呈现了一套崭新的多任务调度抽象架构。

输入图片说明

线程同步点

上述技术路径中的【线程锁】与【有序原子操作】都是通过在线程间设立“同步点”来控制并行多任务的执行步调(比如,在同步点 A,阻塞线程 B,以等待线程 C 的求值结果 D)。但“同步点”的“材质”可以不同:

  • 操作系统内核态【线程锁】(以互斥锁为例)是通过保证对临界区内共享变量的有序独占访问,建立线程间的同步关系。所以,线程间的同步点是被加锁的共享变量,和同步手段是“锁的获取与释放操作”
  • 硬件指令的【有序原子操作】(例如,store+release ~ load+acquire)是通过保证对共享原子变量的有序写/读操作,建立线程间的同步关系。所以,线程间的同步点是共享原子变量,和同步手段是“对原子变量的有序写~读操作”

操作的原子性、排他性与顺序性

无论线程同步点采用哪种“材质”,对同步点的操作都必须同时具备“原子性”、“排他性“与“顺序性”。

  • 【原子性】在多线程环境下,从其他线程视角看,某个操作要么倘未开始,要么已经完成,但绝不会看到该操作执行一半的状态。
  • 【排他性】在对共享内存进行写~读操作时,同一时刻只允许一个线程访问。比如在CUDA中,原子操作通过硬件锁定内存地址,来避免数据竞争。
  • 【顺序性】阻止内存读/写指令在“编译时”与“运行时”被重排序,保持内存实际访问顺序与它们在代码文件内定义的词法次序一致。

面向应用开发的 @Rustacean 们大概率会轻视一个反直觉的常识。即,一组程序指令的执行次序一般不会 100% 对应于它们在程序文件内出现的词法顺序。

  • 首先,在编译时,经循环展开、指令调度等处理环节,编译器会静态调整程序语句的排列次序。这类静态调优处理也十分常见于JS代码混淆器。例如,在“大师级”模式下,为了最大化JIT性能提升效果,JS代码混淆器会变身【优化器】一边静态分析与揣测代码块意图,一边重构代码块。力图语义不变地调整指令执行顺序。
  • 其次,在运行时,经流水线、超标量执行等技术手段,硬件设备也会“动态调优”程序指令的执行次序。

维持“原子性”、“排他性”与“顺序性”的开销差异

操作系统(内核态)【线程锁】

虽然操作系统线程锁自带“原子性“、”排他性“与”顺序性”,但获取与释放锁都是经典的“系统调用”指令。所以,涉及锁操作的执行线程将不得不在操作系统“用户态”与“内核态”间频繁地切换线程上下文。例如,仅在对加锁共享变量的单次操作过程中,应用程序就不得不经历四次线程上下文切换处理:

  1. 【用户态 ➜ 内核态】获取线程锁
  2. 【内核态 ➜ 用户态】原子+排他+顺序地执行对共享变量的业务处理
  3. 【用户态 ➜ 内核态】释放线程锁
  4. 【内核态 ➜ 用户态】执行后续的处理任务

这已是1:4的倍数关系了!可想象,随应用程序业务功能的愈趋复杂臃肿,执行线程在“用户态 ⇆ 内核态”间的切换次数更会指数级增长,和造成自“内存占用”至“响应延时”的双重负担。如此性能惩罚对实时性要求较高的应用程序基本是不可接受的。

硬件【有序原子操作】

原子操作的“原子性“与”排他性“是硬件向系统程序提供的基本保障之一,所以无需编写任何激活指令。而原子操作的“顺序性”则是由比操作系统更底层的内存屏障汇编指令“有选择地”开启或关闭。比如说,

  • 原子操作的内存屏障获取指令,有x86架构的lfence读屏障指令
  • 原子操作的内存屏障释放指令,也有x86架构的sfence写屏障指令

额外补充:不同于x86ARM架构的读/写屏障则采用同一套汇编指令DMBDSB

因为仅调用了“硬件指令”,而未涉及“系统指令”,所以【有序原子操作】也就避开了操作系统线程上下文切换的性能开销。但,坏消息是编程工作的心智成本也被拉高了一个数量级。即便经由系统编程语言(C / C++ / Rust)的深度抽象(比如,下文要介绍的《C++ 20 内存模型》),@Rustacean 依旧不得不小心斟酌

  1. 每个原子操作的内存序,以及
  2. 多个不同内存序原子操作的组合搭配

否则,它们就起不到线程同步的作用,和甚至导致 U.B.。

纯应用层的【超线程】与【协程】

它们追求的目标和【有序原子操作】一样,都是尽可能少地调用(甚至最好不调用)“系统指令”。但【超线程】与【协程】技术路线却翻转了求解方向。它们不再向下求索和找硬件(指令)寻出路,而是向上试探和构建OSApp间的抽象“缓冲带”。这条技术路线的成功再次印证了计算机科学家 David Wheeler 的世纪名言“All problems in computer science can be solved by another level of indirection”在工程实践中的指引价值。

但是,关键的“但是”来了。额外的中间层也会导致计划外的开销。“中间抽象层”不会自己从终端电脑里长出来,而必须借助某种外部方式,被预置到用户设备上

  • 要么,预装虚拟机(比如,Java VM, JS VM, Python VM) — 这对配设备构成了装机门槛。
  • 要么,向应用程序二进制安装包注入额外的 【运行时】代码 — 更大的程序包会导致占用更多的硬盘和消耗更大的内存。其对IoT设备同样不够友好。

所以,【超线程】与【协程】技术路径虽降低了程序开发的人力与心力成本,却抬高了程序运行的硬件门槛。这也就是程序设计的《不可能三角》

输入图片说明

总结

综上所述,无锁编程中的“无锁”并不是放任无序。而是,绕过高开销的操作系统(内核态)线程锁,和面向不同的应用场景,寻找其替代方案:

  • 首先,就后端应用程序而言,[超线程]与[协程]是最优解,因为
    • 机房服务器的硬件升级更易打报告申请达成。对云服务,这仅意味着“下工单”和“扫码支付”。另外,
    • 避免对硬件程序接口的直接依赖也有利于“同一套 Server 程序无缝地兼容于集群内不同型号、不同品牌的服务器设备”和降低运维难度。
  • 其次,就前端应用程序而言,“以[有序原子操作]为线程同步主要手段,再辅以少量的[操作系统(内核态)线程锁]做微调”则是更实惠的策略。
    • 一方面,“对程序部署环境提要求”和“更巧的程序分发包”是衡量前端程序的硬指标。若把后端程序比作排山倒海的“马其顿方阵”群欧,那前端程序则更像是形单影只的“刺客”单挑。前者看重的是“对大头兵的动员力”;而后者的生存信条则是“单兵素质”。
    • 另一方面,倚重硬件技术方案的性能上限更高,被逆向工程的难度也更大,对低配硬件的适用性还更好。

作为前端应用程序开发的从业者,我将在文章后续内容详细分享对[有序原子操作]的理解。

原子内存排序

原子操作执行结果的可见性受两方面因素的影响

  1. 原子操作自身的顺序性
  2. 将原子操作执行结果从CPU缓存(L1/L2/L3)刷入(flush)设备主内存的时间窗口。

原子内存排序就是手工调控原子操作(及毗邻非原子操作)执行结果可见范围的、面向 @系统程序员 的顶层程序抽象。具体地讲,

  1. 在编程中,它就是原子操作程序接口的末位形参:内存序修饰符枚举值。例如,在atomic_var.store(10, std::sync::atomic::Ordering::Release);中的std::sync::atomic::Ordering::Release
  2. 在编译中,它左右编译器
    1. 采用或“激进”或“保守”的策略对MIR(Mid-level IR)中间码做“指令重排”优化,
    2. 将“内存序修饰符”翻译为“内存屏障汇编指令”并注入编译后的LLVM IR中间码内。
  3. 在运行中,它再以“内存屏障汇编指令”的形式,继续渗透影响力至
    1. CPU指令流水线 — 克制硬件自身对操作指令执行次序的动态调整
    2. CPU缓存(L1/L2/L3)— 强制先同步“CPU缓存”内数据至“设备主内存”,再执行内存屏障(fense)后的程序指令

接下来,让我们先了解哪些因素会影响原子操作自身的顺序性?

编译器对原子操作顺序性的介入

编译时的程序语句顺序重排是为减少“数据依赖”与剔除“死代码”(dead code)。这涉及到了

  1. 对常量表达式const expression的合并求值
  2. 对赋值语句的跨行合并
  3. 删除程序中“执行不到”或“用不到”的语句

举个例子,下图左侧三条Rust赋值语句,经前端编译器rustc编译后,就会被变形为右侧两条“字面量赋值语句”

输入图片说明

这类“自做聪明”的预处理在Javascript代码混淆器中也比比皆是,因为在单线程执行上下文中,通过重构代码,将部分计算开销分摊至预编译环节绝对是对运行时程序的“减负”。但在 Rust 协同多任务场景下,这潜在地会破坏并发程序的正确性。比如,可能恰好就存在某个执行线程正依赖上例中的第一条赋值语句x = 2i32完成计算,但编译后的代码却把2_i32给丢了。那么,此处的性能优化举措便立即走向反面和变身为缺陷制造者,还极难在后期排查。

因此,原子内存排序的第一个功能就是用“内存序修饰符”标记程序中的“原子操作指令”,告诉编译器:“哪些优化是可做的,而哪些是不可做的”。

CPU芯片对原子操作顺序性的介入

运行时的操作指令(执行)次序重排源自CPU对“中间计算结果”的分层存储结构memory hierarchies。简单地讲,虽然多核CPU原则上共用一套全局共享内存,但每个处理核心也(为运行于其上的执行线程)自备了本地快速缓存。所以,某个执行线程对共享变量的修改结果必须从本地快速缓存同步至全部其它处理核心自己的快速缓存后,才能确保共享变量的新值对所有线程可见。这是《缓存一致性协议MESI》的重要约定之一。于是,为了提升“分层存储”的传值效率,CPU指令流水线就会自由裁量地动态调整程序指令的最终执行顺序。举个例子,

输入图片说明

若仅分析此段伪码的书面含义,那变量y的求值结果

  • 要么,等于3 — “线程2 先执行,线程1 后执行”或“线程1 与 线程 2 交错着执行”
  • 要么,等于6 — 线程1 先执行,线程2 后执行。

但硬件重排后的程序指令还会执行出第三个结果值(如下图):

  1. 线程 1 中x, y变量的赋值语句被颠倒,
  2. 线程 1 与线程 2 交错执行,造成对y变量赋值的并发冲突

输入图片说明

因此,【原子内存排序】的第二个功能就是将“有序原子操作”中的【内存序修饰符】枚举值翻译为面向特定CPU架构的“内存屏障汇编指令”,以达到如下三个目的:

  1. 抑制“CPU指令流水线”对指令执行顺序调优的“自由裁量权” — 对应上图中的红框部分。
  2. 限制并发读写操作执行结果的可见性(即,CPU缓存的数据同步时间窗口) — 对应上图中的蓝框部分。
  3. 向 @Rustacean 隐藏强/弱序硬件的“内存屏障汇编指令”差异。

不同架构的CPU产品会在“面向程序运行效率”与“面向程序开发效率”有不同的偏重。给“运行效率”加权的CPU会内置更激进的程序指令调优规则,以获得更快的运行速度。而给“开发效率“加权的CPU则会预置更保守的优化策略,使程序的执行结果更易预测降低程序设计的心智成本。所以,处理器芯片至少可以分为

  • 【保守】强序strongly-ordered硬件(比如,x86/64SPARC TSOIBM mainframe
  • 【激进】弱序weakly-ordered硬件(比如,ARMItaniumPowerPC

两类流派。

强序strongly-ordered硬件

采用保守的优化策略:若程序指令字面地“允许无序”(即,书面指定relaxed内存序),那就都按“有序”(即,release/acquire内存序)来处理。

弱序weakly-ordered硬件

采用激进的优化策略:若程序指令字面地指定“有序内存序”(比如,release/acquire/consume/seq_cst),那就皆按“允许无序”的relaxed内存序来处理。

抹平【强/弱序硬件】差异的抽象

C++ 20 内存模型 规范【原子内存排序】对【强/弱序硬件】的程序抽象为如下两个要点:

  1. 约定以【弱序硬件】作为编程的开发测试平台,要求 @系统程序员 编写最严谨的代码,且明示每个原子操作的“内存序”。然后,
  2. 由后端编译器(例如,LLVM),根据程序部署硬件的内存序强/弱,在交叉编译过程中,决定哪些内存序修饰符需要被翻译成对应的内存屏障汇编指令,和哪些可以被“忽略”。

于是,@Rustacean 一般不用刻意关注程序部署设备的硬件特性。只要标记好每个原子操作的内存序,和正确配置交叉编译参数,让LLVM去完成剩余工作。

C++ 20内存模型

Rust系统编程语言也采用《C++ 20 Memory Model》作为其内存访问控制的“行为纲领”。鉴于国内普及的C++标准库版次都比较老,读者们可能更熟知该内存模型的早期版本《C/C++11 Memory Model》(或称C11)。《C++ 20 内存模型》虽内容繁杂,但其宗旨可简洁地概括为约定一种在

  • @程序员 追求的 代码正确实现
  • 编译器 追逐的 程序极限优化
  • 硬件 保留的自由裁量不确定性

之间寻找平衡点的机制。以便,使内存访问既要确保正确性,还得为随机应变的“调优”留有自由腾挪的余地。

《C++ 20 Memory Model》将程序对内存的读/写操作分为:

  • 普通非原子操作的数据访问data accesses。后文引用其为“非原子操作”。
  • 原子操作的原子访问atomic accesses。后文将引用其为“原子操作”。

然后,以原子操作执行结果的“可见性”为支点,规划“非原子操作”与“原子操作”共同的执行次序。

内存访问次序

《C++ 20 内存模型》将 @程序员 介入指令重排的方式方法标准化为(如下图)

  1. 先由内存序修饰符在原子操作之间建立“发生先于...”关系happens-before relationship,和明确原子操作间的执行次序。
  2. 再在非原子操作与原子操作之间建立“顺序先于...”关系sequenced-before relationship。进而,根据原子操作的执行顺序,排列同线程相邻非原子操作的执行次序。

输入图片说明

接着,编译器与CPU指令流水线就会

  • 一方面,对建立了“发生先于”和“顺序先于”关系的程序指令,执行保守调优策略。因此,程序指令的实际执行顺序就会更拟合于 @程序员 的代码设计预期。
  • 另一方面,对建立happens-beforesequenced-before关系的程序指令,执行激进调优策略。程序指令的执行顺序就带有更多不确定性。于是,程序模块间的耦合关系就不应该依赖于这类指令的执行结果

部分中文资料将术语sequenced-before翻译为“程序顺序”。我是不同意的,因为“程序顺序”给读者的第一字面含义错觉是“程序指令出现于代码源文件中的词法次序”。但,sequenced-before 原意是“经编译器与CPU指令流水线调优后,程序指令在单线程内的实际执行顺序”。甚至,在对同一复合表达式的重复求值过程中,其内部子表达式与操作数的中间计算顺序也是变化的

内存屏障汇编指令(memory fences)

《C++ 20 内存模型》将“原子操作”抽象描述为“原子操作行为 + 内存序修饰符”的程序接口,以隐藏“内存屏障汇编指令”的两个技术细节:

  1. 不同CPU架构的“内存屏障指令”会采用不同的汇编指令集。比如,
    1. x86/64架构的sfencemfencelfence指令
    2. ARM架构的DMBDSBISB指令
  2. 每条“内存屏障汇编指令”既是独立的,但又得搭配恰当的寄存器读/写指令才能完成一次同步读/写操作。

屏障指令与读写指令的各种组合方式不胜枚举,其中也不乏会导致 U.B. 的无效组合。但在遵循《C++ 20 内存模型》的系统编程语言中,屏障指令被降级为原子变量读/写 API 的(末位)形参,和变形为数量有限且语义明确的枚举值。举个例子,

  1. 下图左侧是ARM架构的汇编伪码片段;而右侧是Rust系统程序片段。
  2. 蓝框中的代码描述了一次内存同步操作;红框中的代码描述了一次内存同步操作。
  3. 在左侧汇编伪码中,蓝框与红框重叠的部分就是DMB内存屏障汇编指令。它虽是独立语句,但也需搭配完成具体功能的读/写指令才能起效。
  4. 在右侧Rust程序中,内存屏障汇编指令与寄存器读(/写)指令被合并为一条Rust有序原子操作语句。酷!

输入图片说明

总之,《C++ 20 内存模型》以store+releaseload+acquirefetch_add+acq_rel等程序指令表示带屏障的内存(同步)读写操作。其中,

名词解释

在进一步解释【有序原子操作指令】中的【内存序修饰符】前,先和读者约定条几条名词解释,以使后续的文字描述更简洁。

  1. 顺序先于Sequenced before

    单线程内,若指令 A 先于指令 B 执行,那么就称“指令 A 顺序先于指令 B”(记作A is sequenced before B)。再次强调它不是程序语句在代码源文件内定义的字面词法顺序,而是指令的单线程执行顺序。它的反概念是“顺序后于Sequenced-after”。

  2. 数据依赖于Carries dependency

    单线程内,若指令 A 顺序先于Sequenced-before指令 B,且 A 与 B 之间满足如下关联关系之一

    • 指令 A 的输出结果是指令 B 的输入实参。又或者说,指令 A 的求值结果是指令 B 表达式的操作数。但 A 结果又不能是 B 输入的
      • std::kill_dependency(..) 函数调用实参,和 B 表达式中
      • 逻辑与... && ...、逻辑或... || ...、三元操作符... ? ... : ...、逗号操作符... , ... 的左操作数
    • 指令 B 从变量 M 读取的值就是指令 A 向变量 M 写入的值
    • [依赖传递] 指令 B 数据依赖于指令 X,指令 X 又数据依赖于指令 A

    那么就称“指令 B 数据依赖于指令 A”(记作A carries a dependency into B)。Sequenced beforeCarries dependency的先决条件(记作Sequenced before ⇐ Carries dependency)。

  3. Release序列

    多线程上下文中,以对原子变量 M 的某个store+release(历史)原子操作 A 为起点,沿时间轴向后最长连续执行的一组操作。并且,在该组内的每个写操作

    • 若与指令 A 同线程,那么它就可以是对任意变量的任意写操作。例如,对 M 变量的原子写操作。
    • 若被执行于另一线程,那么它只能是对同一原子变量 M 的"读-改-写RMW"原子操作。

    所以,Release序列也被称为“以指令 A 开头的Release序列”(记作 Release Sequence headed by A)。后文有图应该会使定义更直观些。

  4. 同步于Synchronizes with

    多线程上下文中,若指令 B 从 M 原子变量load+acquire读出的值正是另一线程的指令 A 向同一 M 变量store+release写入的值,那么就称“指令 A 同步于指令 B”(记作A synchronizes with B)。

  5. 依赖序先于Dependency-ordered before

    多线程上下文中,线程 1 执行store+release原子操作 A 向原子变量 M 写入 X 值,线程 2 执行load+consume原子操作 B 从同一变量 M 读出 Y 值。虽然 B 读取的 Y 值不是 A 写入的 X 值,但只要 Y 值出自【以指令 A 开头的Release序列】内某个写操作的执行结果。那么就称“原子操作 A 依赖序先于原子操作 B”(记作A is dependency-ordered before B)。

    单线程的【数据依赖于Carries dependency】与多线程的【依赖序先于Dependency-ordered before】存在“依赖传递”关系。即,若指令 A (跨线程)依赖序先于指令 B(i.e. A < B),同时指令 C 又(线程内)数据依赖于指令 B(i.e. B < C),那么指令 A 就依赖序先于指令 C(i.e. A < C)。

    图示【同步于Synchronizes with】和【依赖序先于Dependency-ordered before】间的差别如下。该图也同时包含了对“Release序列”术语的图形化表述。

    输入图片说明

    虽然关键差异已于上图高亮出来了,但为突出要点,这里还是再做一段文字补充说明

    输入图片说明

  6. 发生先于happens before

    “发生先于...”是一簇关系,而不只是一个关系。它的细分子类包括:“简单发生先于”,“强发生先于”,“线程间发生先于”和“发生先于”。在下图中,

    • 以表格方式呈现了每种“先于关系”子类(表格的右四列)与它们具有的特征属性(表格的最左第一列)。
    • 表格最底一行表示每种先于关系都支持依赖传递。即,若 A 先于 B,B 先于 C,那 A 必先于 C。
    • 表格最左一列内的每个特征条件间是“或”的逻辑关系。即,只要满足条件之一,便属于右侧对应的先于关系子类。
    • 表格第七行的【顺序一致seq_cst同步于】是第三行【同步于】的一个特例情况。

    输入图片说明

    它的反概念即是“发生后于happens after”。

  7. “读-改-写”原子操作RMW

    它会在一个原子操作内对同一原子变量连续且不可中断地完成“读-改-写”三个处理动作,并将第一个“读”处理的执行结果作为整个原子操作的返回值。

    RMW是“读-改-写”操作名(Read-Modify-Write)的首字母缩写词。

    Rust中,常见的“读-改-写”原子操作有 fetch_add/compare_exchange/swap

内存序修饰符

《C++ 20 内存模型》将“内存屏障汇编指令”简化为数量有限且语义明确的内存序修饰符枚举值。

按 C++ 命名规范,这些枚举值可穷举为

  1. std::memory_order_relaxed
  2. std::memory_order_consume(从 C++26 起,将被废弃)
  3. std::memory_order_acquire
  4. std::memory_order_release
  5. std::memory_order_acq_rel
  6. std::memory_order_seq_cst

Rust 也遵循《C++ 20 内存模型》和按 Rust 命名规范,这些枚举值可穷举为

  1. std::sync::atomic::Ordering::Relaxed
  2. std::sync::atomic::Ordering::Release
  3. std::sync::atomic::Ordering::Acquire
  4. std::sync::atomic::Ordering::AcqRel
  5. std::sync::atomic::Ordering::SeqCst

无论 Rust 还是 C++ 都是将“内存序修饰符”作为原子操作成员方法位形参。格式为:原子变量 . 原子操作成员方法 (..., 内存序修饰符)。例如,

r1 = y.load(std::memory_order_relaxed); // 原子读 API 的唯一实参
y.store(1, std::memory_order_relaxed);  // 原子写 API 的末位实参
relaxed内存序的std::memory_order_relaxed/ std::sync::atomic::Ordering::Relaxed

relaxed内存序”不会对它所修饰的原子操作,及与该原子操作既同线程又相邻的非原子操作,做任何执行顺序的保证。这意味着

  • 该原子操作前的内存指令有可能被搬到该原子操作之后执行。所以,相邻指令的执行结果不一定对它可见。
  • 该原子操作后的内存指令也被允许搬到该原子操作之前执行。因此,它的执行结果也不一定对相邻指令可见。

甚至,只要【编译器】与【CPU指令流水线】认定对relaxed指令执行顺序的调整是无害且有利性能的,那么 @Rustacean 对这些程序语句执行顺序的任何假设就都是错的。举个例子,假设有

  • 跨线程共享原子变量xy
  • 左侧线程 1 从y读值和立即向x写入该值,
  • 右侧线程 2 从x读值和向y写入标量值42
  • r1r2分别是线程 1 与线程 2 内的局部变量

输入图片说明

然后,当线程 1 与线程 2 并发执行时,就有可能出现 r2 == x == r1 == y == 42 的结果,因为 A、B、C、D 四条指令会概率地被调度按如下次序执行。

输入图片说明

这天马行空般的指令调度行为,使relaxed内存序仅适用于“不关心过程正确,只注重结果正确”的应用场景。比如,采用RMW+relaxed(无序)原子操作的固定步长多线程计数器 [例程1]。

  • 一方面,RMW的原子性与排他性避免了在“读-改-写”处理中的脏读。
  • 另一方面,恒定的步长值使每次计数计算都不依赖于线程执行次序。
  • 最后,计数器呈现的总计数值永远都是对的。
use ::std::{ sync::{ Arc, atomic::{ AtomicI32, Ordering } }, thread };
fn main() {
    let counter = Arc::new(AtomicI32::new(0));
    let mut join_handles = Vec::new();
    for _ in 0..110 {
        let counter = Arc::clone(&counter);
        join_handles.push(thread::spawn(move || {
            // 因为每个线程的每次计数计算仅恒定地累加 1,所以谁先执行谁后执行并不重要。
            counter.fetch_add(1, Ordering::Relaxed);
        }));
    }
    for (index, join_handle) in join_handles.into_iter().enumerate() {
        join_handle.join().expect(&format!("第{}个线程提前崩溃了", index)[..]);
    }
    println!("最终的计数结果是:{}。它总是正确的", counter.load(Ordering::Relaxed));
}
release内存序的std::memory_order_release/std::sync::atomic::Ordering::Release

store+release有序原子操作会保证发生先于操作的原子或非原子操作都被重排至此“写”操作之后执行。此外,

  1. release内存序修饰符必须与原子操作指令搭配使用,无论该写操作是纯写操作store还是RMW类的swapfetch_add等操作。
  2. load+release原子读操作指令会导致编译失败。
  3. 孤用store+release原子操作指令起不到线程同步作用,它需与acquireconsumeacq_rel内存序的原子操作对同一个原子变量配对使用才能起效。

RMW+release会使“读-改-写”原子操作中的处理有release内存序,和处理有relaxed内存序。

acquire内存序的std::memory_order_acquire/std::sync::atomic::Ordering::Acquire

load+acquire有序原子操作会保证发生后于操作的原子或非原子操作都被重排至此“读”操作之前执行。此外,

  1. acquire内存序修饰符必须与原子操作指令搭配使用,无论该读操作是纯读操作load还是RMW类的swapfetch_add等操作。
  2. store+acquire写原子操作会导致编译失败
  3. 孤用load+acquire原子操作指令起不到线程同步作用,它需与releaseacq_rel内存序的原子操作对同一个原子变量配对使用才能起效。

RMW+acquire会使“读-改-写”原子操作中的处理有acquire内存序的语义,和处理有relaxed内存序的语义。

consume内存序的std::memory_order_consume

consume内存序是acquire内存序的减配版。具体地讲,load+consume有序原子操作仅保证

  • 发生后于操作(同acquire内存序的起效条件),
  • 数据依赖于该“读”操作执行结果(额外新条件)

的原子或非原子操作都被重排至此“读”操作之前执行。此外,

  1. consume内存序修饰符必须与原子操作指令搭配使用,无论该读操作是纯读操作load还是RMW类的swapfetch_add等操作。
  2. store+consume写原子操作会导致编译失败
  3. 孤用load+consume原子操作指令起不到线程同步作用,它需与releaseacq_rel内存序的原子操作对同一个原子变量配对使用才能起效。

RMW+consume会使“读-改-写”原子操作中的处理有consume内存序的语义,和处理有relaxed内存序的语义。

但 Rust 标准库并没有实现《C++ 20 内存模型》中的consume内存序。文章倒数第二节《Rust 未实现的 C++ 20 内存模型特征》会解释其中的原因。

acq_rel内存序的std::memory_order_acq_rel/std::sync::atomic::Ordering::AcqRel

RMW+acq_rel会使“读-改-写”原子操作中的处理有acquire内存序语义,和处理有release内存序语义。此外,

  1. load+acq_relstore+acq_rel的纯“读”和纯“写”原子操作指令都会导致编译失败。
  2. 孤用RMW+acq_rel原子操作指令起不到线程同步作用,它需与releaseacquireconsumeacq_rel内存序的其它原子操作对同一个原子变量配对使用才能起效。
seq_cst内存序的std::memory_order_seq_cst/std::sync::atomic::Ordering::SeqCst

seq_cst是顺序性最强的内存序,因为所有seq_cst内存序的原子操作都会被拎出来排列组合构建一个全局总顺序,即便这些seq_cst内存序原子操作

  • 正读/写着不同的原子变量,和
  • 隶属于不同的线程

这就好似存在一个全局 “时间线”,所有seq_cst内存序原子操作都在这个 “时间线” 上有序排列,和依次执行。同时,seq_cst内存序还兼具备

  • release内存序语义 — 阻止“发生于”seq_cst内存序原子操作的(原子或非原子)操作指令重排至该seq_cst原子操作之执行。
  • acquire内存序语义 — 禁止“发生于”seq_cst内存序原子操作的(原子或非原子)操作指令重排至该seq_cst原子操作之执行。

孤用一个seq_cst内存序原子操作指令起不到线程同步作用,它需至少与另一个seq_cst内存序原子操作指令配对使用才能起效。

正如硬币的两面,seq_cst“强排序”虽会给程序执行顺序带来前所未有的确定性并降低程序设计难度,但也会导致严重的性能惩罚

内存序修饰符的组合

“有序原子操作”比“操作系统线程锁”更难用的复杂点就是原子操作的内存序是非对称的。下面以“发布-订阅设计模式”为例,展开解释常见原子操作对的使用的套路。请见下表

输入图片说明

release-acquire的内存序组合用法

当对同一个原子变量 M 的store+releaseload+acquire原子操作相遇时,因为前者会保证“发生先于”store操作的指令不会被往后排(下图左侧红色阴影),后者会保证“发生后于”load操作的指令不会往前排(下图右侧蓝色阴影),于是就形成了一个以原子变量 M 为枢轴的线程同步点(下图中间垂直的灰色虚线)。所以,在左侧红色阴影内入的一定对右侧蓝色阴影内的读指令可见。重点来了(敲黑板),即便左侧红色阴影中的

  • 写操作是非原子的普通变量赋值,而不是原子操作
  • 被赋值的变量是(原子引用计数的)普通变量Arc<T> where T: Send + Sync,而不是原子变量
  • 普通变量与原子变量 M 之间没有任何数据依赖关系

,左侧红色阴影内生成的新值依旧对右侧蓝色阴影内的程序指令可见。

输入图片说明

这就给 @Rustacean 提供了一套“在线程间,绕过传统操作系统,同步传输大数据对象”的以小搏大的解决方案。即,将“大数据对象的跨线程同步”拆分为

  1. 线程同步
  2. 数据共享

两个子任务。然后,以简单数据类型的原子变量同步线程(低成本);以复杂数据结构的普通变量赋值传递数据(高收益)。举个例子,

  1. 在上图左侧线程 1 的红色阴影内,将复杂结构体的字符串实例写入一个(原子引用计数的)普通变量中。这一步会涉及一点儿unsafe代码。
  2. 在上图中间竖直灰色虚线位置,以单字节原子变量std::sync::atomic::AtomicU8为代价,同步线程 1 与线程 2。
  3. 在上图右侧线程 2 的蓝色阴影内,线程安全地读取出共享普通变量内的字符串值。

即便涉及更多线程(后有例程),“跨线程同步传输复杂结构体实例”依旧遵循相同的技术原理与处理步骤。于是,先贤们总结归纳出【指针介导发布pointer-mediated publication】的发布-订阅设计模式。

指针介导发布pointer-mediated publication的发布-订阅设计模式

首先,参与数据同步的每个子线程都须按如下三段式结构组织代码

  1. 【加(自旋)锁】while循环重复执行对共享原子变量的load+acquire指令
  2. 【临界区】线程安全地写~读复杂数据结构的普通共享变量值。
  3. 【解锁】执行对同一共享原子变量的store+release指令

然后,从主线程俯览全部线程,该设计模式完整的代码布局如下图

输入图片说明

强调上图中的若干要点

  1. 在子线程中,引入自旋锁spinlock阻塞程序执行和圈建临界区。关于“自旋锁”的技术细节,后续有专门的章节介绍。
  2. 在临界区内,使用unsafe Rust代码修改(原子引用计数的)普通共享变量值。虽未用操作系统互斥锁,但同样线程安全。
  3. 充当线程阻塞条件的原子变量也被称为“信号量 semaphore”。它一律采用简单的基本数据类型。
  4. 载荷复杂数据的普通变量既不必是原子变量,也无须加操作系统线程锁。
  5. 依据原子操作的顺序性,只要信号量的值变更对当前线程可见,那么“发生先于”它的普通共享变量修改也必能被当前线程观察到。

接下来举个[例程2],绕过操作系统互斥锁,实现四个线程对同一字符串实例的并发读-改-写操作。程序处理逻辑概括如下:

  1. 主线程初始化字符串变量值为字面量“以字符串模拟复杂数据结构”
  2. 线程 1 给字符串添加结束符“】”
  3. 线程 2 给字符串添加起始符“【”
  4. 线程 3 则打印输出修改后的字符串值“【以字符串模拟复杂数据结构】”
  5. 每个子线程都以自旋锁阻塞执行,以达成自线程 1 至线程 3 的串行执行。
  6. 每个子线程都采用std::sync::Arc<T>unsafe关联方法Arc::get_mut_unchecked()获取智能指针内的可修改引用&mut T,以绕过std::sync::Mutex<T>互斥锁。
  7. 因为标准库的std::string::String不支持跨线程传输,所以依照newtypes设计模式,为其定义了一个跨线程壳子UnsafeString结构体。
// 开启 feature-gate,以支持 get_mut_unchecked 方法 — 我懒和不想弯弯绕地摆弄原始指针。
#![feature(get_mut_unchecked)]
/// 以 newtyeps 设计模式,封装标准库的 String 为 unsafe_string::UnsafeString 结构体,以使其具备跨线程传输与引用的能力。
mod unsafe_string {
    use ::ambassador::{ Delegate, delegatable_trait_remote };
    use ::std::{ convert::{ AsRef, AsMut }, fmt::{ Display, Formatter, Result as IoResult }, ops::{ Deref, DerefMut } };
    #[delegatable_trait_remote]
    trait Display {
        fn fmt(&self, f: &mut Formatter<'_>) -> IoResult;
    }
    #[derive(Default, Delegate)]
    #[delegate(Display)]
    pub struct UnsafeString(pub String);
    unsafe impl Sync for UnsafeString { }
    unsafe impl Send for UnsafeString { }
    impl Deref for UnsafeString {
        type Target = String;
        fn deref(&self) -> &Self::Target { &self.0 }
    }
    impl DerefMut for UnsafeString {
        fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
    }
    impl AsRef<String> for UnsafeString {
        fn as_ref(&self) -> &String { &self.0 }
    }
    impl AsRef<str> for UnsafeString {
        fn as_ref(&self) -> &str { self.0.as_ref() }
    }
    impl AsMut<String> for UnsafeString {
        fn as_mut(&mut self) -> &mut String { &mut self.0 }
    }
    impl UnsafeString {
       pub fn new<T: AsRef<str>>(source: T) -> Self {
            Self(String::from(source.as_ref()))
       }
    }
}
use ::crossbeam::atomic::AtomicCell;
use ::std::{ sync::{ Arc, atomic::{ AtomicU8, Ordering } }, thread };
use unsafe_string::UnsafeString;
fn main() { // 主线程
    // 同步线程的信号量
    let semaphore = Arc::new(AtomicU8::new(0));
    // 跨线程共享的复杂数据结构普通变量
    let payload = Arc::new(UnsafeString::new("以字符串模拟复杂数据结构"));
    let mut join_handles = Vec::new();
    // 下面的子线程 1 ~ 3 将串行执行,和用【自旋锁】阻塞程序执行。
    { // 子线程 3 — 打印输出已完成两次拼接操作的字符串值
        let semaphore = Arc::clone(&semaphore);
        let payload = Arc::clone(&payload);
        join_handles.push(thread::spawn(move || {
            // 同步线程
            while semaphore.load(Ordering::Acquire) != 2 { /* 热轮询 - 等待信号量就位 */ }
            // 线程安全地读取普通变量的值,因为
            // 1. 线程 1 已经执行结束
            // 2. 线程 2 也已执行结束
            println!("3. 最后打印输出两次修改后的结果: {}", payload);
        }));
    }
    { // 子线程 2 — 给共享字符串拼接起始“【”字符。
        let semaphore = Arc::clone(&semaphore);
        let mut payload = Arc::clone(&payload);
        join_handles.push(thread::spawn(move || {
            // 同步线程
            while semaphore.load(Ordering::Acquire) != 1 { /* 热轮询 - 等待信号量就位 */ }
            // 调用 Arc<T> 的 unsafe API,线程安全地修改共享普通变量的值,因为
            // 1. 线程 1 已经执行结束
            // 2. 线程 3 还在阻塞中
            unsafe { Arc::get_mut_unchecked(&mut payload).insert_str(0, "【") };
            println!("2. 再添加(首)起始符: {}", payload);
            // 修改信号量的变量值,使其不再满足线程 3 自旋锁的阻塞条件。
            semaphore.store(2, Ordering::Release);
        }));
    }
    { // 子线程 1 — 给共享字符串拼接结束“】”字符。
        let semaphore = Arc::clone(&semaphore);
        let mut payload = Arc::clone(&payload);
        join_handles.push(thread::spawn(move || {
            // 调用 Arc<T> 的 unsafe API,线程安全地修改共享普通变量的值,因为
            // 1. 线程 2 还在阻塞中
            // 2. 线程 3 也还在阻塞中
            unsafe { Arc::get_mut_unchecked(&mut payload).push_str("】") };
            println!("1. 先添加(后)结束符: {}", payload);
            // 修改信号量的变量值,使其仅不再满足线程 2 自旋锁的阻塞条件。
            semaphore.store(1, Ordering::Release);
        }));
    }
    for (index, join_handle) in join_handles.into_iter().enumerate() {
        join_handle.join().expect(&format!("第{}个线程提前崩溃了", index)[..]);
    }
    // 全部子线程执行结束,所以主线程依旧是线程安全地打印输出共享变量的值。
    println!("4. 结束:{}", payload);
    println!("5. crossbeam 是否,对 UnsafeString 实例的跨线程共享,内部启用了操作系统线程锁:{}", !AtomicCell::<UnsafeString>::is_lock_free());
}

题外话,若非团队内人人都是资深 @Rustacean,我还是推荐优选操作系统线程锁,因为充满unsafe代码的程序是很“脆”的。团队内旦凡冒出个既菜又不擅沟通的 intern,上述看似巧妙的性能代码就立即化身内存泄漏点。所以,code review负担会有点儿重。

对比crossbeam crate

上述store+releaseload+acquire跨线程共享UnsafeString实例的设计方案要性能优于直接采用crossbeam::atomic::AtomicCell<UnsafeString>的办法。AtomicCell<T>的卖点是将原子变量的备选范围从基本数据类型扩展至复杂数据结构T,进而使跨线程共享大数据对象更易于代码实现。但AtomicCell<T>就是个银样镴枪头,因为该结构体内部还是依赖传统操作系统线程锁,而没有引入性能考量和技术创新。有兴趣亲自印证这个结论的读者

  • 既可以直接读源码
  • 也能在程序中直接打印输出AtomicCell::::is_lock_free()的返回值。
    • true 代表未启用操作系统线程锁做数据同步
    • false 代表启动了线程锁
release-consume的内存序组合用法

release-consume的工作原理与release-acquire大致相同,但多了一项额外的限制条件。即,发生先于store+release的执行结果仅对发生后于load+consume且对该load+consume执行结果有数据依赖关系的读操作指令可见。关键词是有数据依赖关系。于是,若上例配以release-consume内存序组合,[例程2]就不能执行出预期的字符串结果了,因为传输量payload: Arc<UnsafeString>对信号量semaphore: Arc<AtomicU8>没有数据依赖关系,线程 2 与线程 3 观察不到payload的最新修改值。鉴于 Rust 标准库并未实现consume内存序,这里就不提供例程了。

seq_cst-seq_cst的内存序组合用法

此既强排序又高开销的内存序组合适用于“生产者-消费者”的发布订阅设计模式,并保证从每个消费线程观察到的生产线程执行次序都相同。还是举个[例程3]更直观。先概括程序处理逻辑如下

  1. 主线程初始化原子变量z为零值
  2. 线程 1 修改原子变量xfalsetrue
  3. 线程 2 修改原子变量yfalsetrue
  4. 若线程 3 测试出“线程 1 先执行,但线程 2 还未执行”的中间态,原子变量z累加1
  5. 若线程 4 测试出“线程 2 先执行,但线程 1 还未执行”的中间态,原子变量z累加1
  6. 主线程断言z值是否还等于初始值零。

因为对原子变量x, y, z的修改操作都采用了seq_cst内存序(即,它们皆沿同一全局时间线执行),所以分别从消费端线程 3 和 4 观察到的生产端线程 1 和 2 的执行顺序总是相同的。这就意味着代码中

  • 线程 3 和 4 的互反测试条件永远都可能同时为。于是,
    • 要么,观察到线程 1 和 2 皆执行结束 — 原子变量z值最终等于2
    • 要么,观察到线程 1 和 2 以相同的顺序执行 — 原子变量z值最后等于1
  • 主线程结尾处的z值零断言则永远不会成立
use ::std::{ sync::{ Arc, atomic::{ AtomicBool, AtomicU8, Ordering } }, thread };
fn main() {
    let x = Arc::new(AtomicBool::new(false));
    let y = Arc::new(AtomicBool::new(false));
    let z = Arc::new(AtomicU8::new(0));
    let mut join_handles = Vec::new();
    { // 线程 1 赋值 x
        let x = Arc::clone(&x);
        join_handles.push(thread::spawn(move || {
            x.store(true, Ordering::SeqCst);
        }));
    }
    { // 线程 2 赋值 y
        let y = Arc::clone(&y);
        join_handles.push(thread::spawn(move || {
            y.store(true, Ordering::SeqCst);
        }));
    }
    { // 线程 3 不要只留意 z.fetch_add() 条件分支,而是要把注意力放在未出现的 else { ... } 分支上。
        let x = Arc::clone(&x);
        let y = Arc::clone(&y);
        let z = Arc::clone(&z);
        join_handles.push(thread::spawn(move || {
            while x.load(Ordering::SeqCst) == false { /* 热轮询 - 等待信号量就位 */ }
            if y.load(Ordering::SeqCst) == true {
                z.fetch_add(1, Ordering::SeqCst);
            } /* else {
                由此条件分支,线程 3 才能筛选出的“线程 1 先执行,而线程 2 未执行”的中间态
                原子变量 z 不累加 1 才是被线程 3 期待的结果
            } */
        }));
    }
    { // 线程 4 不要只留意 z.fetch_add() 条件分支,而是要把注意力放在未出现的 else { ... } 分支上。
        let x = Arc::clone(&x);
        let y = Arc::clone(&y);
        let z = Arc::clone(&z);
        join_handles.push(thread::spawn(move || {
            while y.load(Ordering::SeqCst) == false { /* 热轮询 - 等待信号量就位 */ }
            if x.load(Ordering::SeqCst) == true {
                z.fetch_add(1, Ordering::SeqCst);
            } /* else {
                由此条件分支,线程 4 才能筛选出的“线程 2 先执行,而线程 1 未执行”的中间态
                原子变量 z 不累加 1 才是被线程 4 期待的结果
            } */
        }));
    }
    // 线程 3 与线程 4 的隐含筛选条件是互反的。
    for (index, join_handle) in join_handles.into_iter().enumerate() {
        join_handle.join().expect(&format!("第{}个线程提前崩溃了", index)[..]);
    }
    let z = z.load(Ordering::SeqCst);
    // z 值永远不会是零值,因为线程 3 与线程 4 互反的筛选条件
    // * 要么,一个成立。
    // * 要么,两个都成立。
    assert_ne!(z, 0_u8);
    dbg!(z);
}

这个例子有点绕,需要读者结合例程中的注释多体会。

seq_cstrelease/acquire的内存序组合

release-seq_cstseq_cst-acquire两套内存序组合都会执行出不确定的结果,因为seq_cst的全局总顺序并不总一致于release-acquire内存序的happens-before关系。当它们冲突时,到底以谁为准并没有明确的约定。举个例子,

输入图片说明

即便指令 B 碰巧执行先于指令 C,指令 A 也会因其发生先于指令 B 也就一定总顺序先于指令 C 执行。这是因为在线程 1 中,就同线程的seq_cst原子操作 B 而言,seq_cst原子操作 A 依旧是可重排序的。也就是说,本例中release原子操作指令 B 不能凭借happens-before关系就约束词法顺序先于它的指令 A 不被重排至其后执行,因为指令 A 的内存序是seq_cst。于是,原子变量x, y横跨三个线程的六个原子操作指令的执行次序

  • 既有可能是指令 A 最先执行的 A➜B➜C➜E➜D➜F 和打印输出非原子变量值 r1 == 1,r2 == 3,r3 == 1
  • 也有可能是指令 A 最后执行的 B➜C➜E➜D➜F➜A 和打印输出非原子变量值 r1 == 1,r2 == 3,r3 == 0

没有唯一正确答案和执行次序的确定性。

自旋锁

前文几乎每个例程都采用自旋锁代替传统操作系统线程锁阻塞程序执行,和圈建线程安全临界区。但自旋锁中的空体while循环明显是CPU低效的。那是否有改进优化的空间呢?当然有。这就需从【自旋锁】的三种形态说起,按CPU费效比自高至低排序,它们依次是

  1. 空体while循环的轮询自旋锁
  2. while循环 + 让步原语(std::thread::yield_now())的轮询自旋锁
  3. 互斥锁(std::sync::Mutex<T>) + 条件变量(std::sync::Condvar)的条件轮询自旋锁

轮询自旋锁

在线程阻塞期间,自旋锁并没有释放CPU处理其它待办任务,而是持续独占CPU和什么都不干。于是,CPU浪费了一连串时间片,和付出了一个又一个机会成本。这样粗糙的线程阻塞机制也被称为“忙等待busy-wait” — 很形象的描述。标准库封装的休眠系统调用(nanosleep/Sleep)解决不了这类CPU浪费。

while semaphore.load(Ordering::Acquire) != 2 {
    // 休眠 1 秒就是连续空占CPU一秒。这是浪费,且毫无意义!
    thread::sleep(Duration::from_millis(1000)); 
}

相反,上面的代码还会使空耗变得更严重。即,休眠多长时间,CPU就空转多久。完整的代码请见[例程4]。

轮询自旋锁

CPU空转问题的解决需要比“休眠系统调用”更底层的技术手段:“向操作系统发送让步原语”。

while semaphore.load(Ordering::Acquire) != 2 {
    // yield 信号通知 OS 调用端线程主动放弃刚获得的 CPU 时间片。让CPU处理其它待办任务去。
    thread::yield_now(); 
}

让步原语表示当前线程愿意立即放弃刚分得的CPU时间片,和允许CPU马上去处理其它线程的待办任务。于是,while循环就不会持续空耗CPU和浪费CPU时间片。完整的代码请见[例程5]。

睿智的读者马上就会追问:“在放弃CPU时间片后,该线程会被何时唤起和接着执行剩余的计算任务呢?”。好问题!

答:在CPU时间片的下一轮分配周期,线程就会被再次唤起,和重新测试阻塞条件是否继续成立。随着操作系统中断信号的持续出现,CPU时间片分配处理是很频繁发生的。

至此,冷轮询自旋锁的缺点也呼之欲出

  1. 被阻塞线程会持续参与对后续CPU时间片的竞争,和陷入“既争抢获取又立即放弃”的怪诞循环中,因为该线程进入“临界区”的条件始终都未满足。反复地争抢CPU时间片的也是一种浪费。理想情况是,只要临界区准入条件不满足,线程最好既不要忙等待,也不要连续参与对CPU时间片的争抢,而就像“死过去”一样。
  2. 在线程向OS发送让步原语的那一刻,若没有另一线程恰好备有待办任务,那么“CPU忙等待于当前线程”仍不可避免。待办任务或早或晚出现一纳秒都将错过空忙(至少)一个时间片的CPU。所以,能让“冷轮询”起效的时间窗口很。理想情况是,被退回的CPU时间片最好托管给OS再调度,而不是直接转手交给另一线程。

条件轮询自旋锁

看似最佳解决方案就要华丽登场了。但对线程阻塞时间窗口的精细操控还是绕不开操作系统线程锁std::sync::Mutex<T>。这是由 Rust 标准库对【条件变量std::sync::Condvar】定义决定的。请见下面代码中,

mutex_guard = cvar.wait_while/* 条件阻塞成员方法 */( 
    /* 1. 互斥锁守卫 */mutex_guard, 
    /* 2. 判断阻塞条件是否成立的谓词函数 */|&mut value| value != 2
)?;

条件阻塞成员方法的

  1. 第一个实参就是互斥锁的智能指针std::sync::MutexGuard
  2. 第二个实参则是阻塞条件谓词闭包。

更贴近自旋锁教条定义的另一款程序接口也同样依赖操作系统互斥锁。请见代码

while *mutex_guard != 1/* 2. 判断阻塞条件是否成立的表达式 */{
    mutex_guard = cvar.wait/* 阻塞成员方法 */(
        /* 1. 互斥锁守卫 */mutex_guard
    )?;
    // 即便被唤起,线程也是先判断 #2 中的阻塞条件是否还继续成立。
    // 然后,才能接着执行后续待办任务
}

由上总结两点:

睿智的读者可能又要追问:“为什么不是if判断,而是while循环?”。好问题plus!

答:因为止步于相同阻塞条件的待办线程通常是多个。即便Condvar::notify_all(&self)将它们同时唤起,也只有其中一个线程能率先执行,且率先执行的线程被允许重置阻塞条件为再次成立,于是while循环就能保证被一同唤起的其它线程

  1. 先判断阻塞条件是否已被前面执行的线程“掰正”又成立了?
  2. 然后,再决定
    • 是再次阻塞,待下次唤起机会。
    • 还是接着执行剩余的待办任务。

所以,while循环是多线程同步的关键构件,且不可降级为if单次判断。

除了互斥锁引入了线程上下文切换的开销外,条件轮询自旋锁甚是完美。它使线程在阻塞期间真像死过去一样

  • 既不空耗CPU
  • 也不参与对CPU时间片的竞争

所以,尽管 std::sync::Mutex<T> + std::sync::Condvar 不是“原子内存排序”的最佳拍档,但其相关知识点依旧值得被积极分享。

在同步程序中条件变量struct Condvar唤起阻塞线程的处理流程十分酷似于异步程序中struct Waker唤起停滞异步任务的过程。我将结合接下来的一图,一表和三个例程对比着解释。首先,当某个计算密集任务或I/O操作出现时,

  • 同步程序会为计算密集任务另起一个Slave线程,或从线程池拾取某个闲置的Slave线程
  • 异步程序会为I/O操作实例化一个反应器Reactor

然后,将计算任务与I/O操作承包给Slave线程和反应器Reactor,并等待它们的计算结果反馈。反馈过程一般不是一蹴而就的,而是循环执行如下五个处理步骤,直至反馈结果满足预定的要求。图中每个步骤都被标以红色加粗的序号,和出现于左右两侧相同的位置。

输入图片说明

接下来的表格从更宏观的视角,横向对比

  • 热/冷轮询自旋锁的“原子内存排序”线程同步
  • 条件轮询自旋锁的“条件变量+互斥锁”线程程步
  • Reactor+Waker的异步任务调度

输入图片说明

最后,上表内各列纵向描述的每条技术路径也分别被准备了一个例程

  • 原子内存排序
    • 热轮询自旋锁线程同步[例程4]
    • 冷轮询自旋锁线程同步[例程5]
  • 条件变量+操作系统线程锁
    • 条件轮询自旋锁线程同步[例程6]
  • 异步程序的例程都出自于我早先的社区贡献deferred-future crate
    • 异步单线程[例程7] — 适用于 wasm 应用场景
    • 异步多线程[例程8] — 适用于云端服务程序

其中,[例程6]改造指针介导发布的[例程2]为该设计模式的条件轮询自旋锁实现版本。请注意例程中的注释,每个代码变更位置都伴有详细解释(共五处)。

// 可跨线程传输的不安全字符串结构体定义未变,所以该模块可直接跳过
mod unsafe_string {
    use ::ambassador::{ Delegate, delegatable_trait_remote };
    use ::std::{ convert::{ AsRef, AsMut }, fmt::{ Display, Formatter, Result as IoResult }, ops::{ Deref, DerefMut } };
    #[delegatable_trait_remote]
    trait Display {
        fn fmt(&self, f: &mut Formatter<'_>) -> IoResult;
    }
    #[derive(Default, Delegate)]
    #[delegate(Display)]
    pub struct UnsafeString(pub String);
    unsafe impl Sync for UnsafeString { }
    unsafe impl Send for UnsafeString { }
    impl Deref for UnsafeString {
        type Target = String;
        fn deref(&self) -> &Self::Target { &self.0 }
    }
    impl DerefMut for UnsafeString {
        fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
    }
    impl AsRef<String> for UnsafeString {
        fn as_ref(&self) -> &String { &self.0 }
    }
    impl AsRef<str> for UnsafeString {
        fn as_ref(&self) -> &str { self.0.as_ref() }
    }
    impl AsMut<String> for UnsafeString {
        fn as_mut(&mut self) -> &mut String { &mut self.0 }
    }
    impl UnsafeString {
       pub fn new<T: AsRef<str>>(source: T) -> Self {
            Self(String::from(source.as_ref()))
       }
    }
}
use ::std::{ sync::{ Arc, Condvar, Mutex }, thread };
use unsafe_string::UnsafeString;
fn main() {
    // 变更位置一:“互斥锁+条件变量”元组代替原子变量信号量
    let shared_data = Arc::new((Mutex::new(UnsafeString::new("以字符串模拟复杂数据结构")), Condvar::new()));
    let mut join_handles = Vec::new();
    { // 线程 3
        let shared_data = Arc::clone(&shared_data);
        join_handles.push(thread::spawn(move || {
            let (lock, cvar) = shared_data.as_ref();
            let _ = lock.lock().and_then(|mut guard| {
                // 变更位置二:以条件变量的阻塞成员方法代替“热轮询自旋锁”阻塞线程 3。
                guard = cvar.wait_while(guard, |guard| 
                    !guard.starts_with("【") && !guard.ends_with("】")
                )?;
                println!("3. 最后打印输出两次修改后的结果: {}", guard);
                Ok(())
            });
        }));
    }
    { // 线程 2
        let shared_data = Arc::clone(&shared_data);
        join_handles.push(thread::spawn(move || {
            let (lock, cvar) = shared_data.as_ref();
            let _ = lock.lock().and_then(|mut guard| {
                while !guard.ends_with("】") {
                    // 变更位置三:以条件变量的阻塞成员方法代替 load+acquire 原子操作
                    guard = cvar.wait(guard)?;
                }
                guard.insert_str(0, "【");
                println!("2. 再添加(首)起始符: {}", guard);
                // 变量位置四:手工唤起线程 3。“热轮询自旋锁”代码中没有这一步,因为线程 3 会主动
                //            轮询信号量的值是否发生了变更。
                cvar.notify_one();
                Ok(())
            });
        }));
    }
    {
        let shared_data = Arc::clone(&shared_data);
        join_handles.push(thread::spawn(move || {
            let (lock, cvar) = shared_data.as_ref();
            let _ = lock.lock().map(|mut guard| {
                guard.push_str("】");
                println!("1. 先添加(后)结束符: {}", guard);
                // 变量位置五:手工唤起线程 2 和 3。“热轮询自旋锁”代码中没有这一步,因为那些线程会主动
                //            轮询信号量的值是否发生了变更。
                cvar.notify_all();
            });
        }));
    }
    for (index, join_handle) in join_handles.into_iter().enumerate() {
        join_handle.join().expect(&format!("第{}个线程提前崩溃了", index)[..]);
    }
    println!("4. 结束:{}", shared_data.0.lock().map_or("".to_string(), |guard| guard.to_string()));
}

Rust 未实现的《C++ 20 内存模型》特征

Rust 对《C++20 内存模型》的实现是有选择性的。筛选标准至少包括

  1. 计划“废弃”的标准规范不实现。例如,在《consume 内存序的 std::memory_order_consume》章节介绍过的consume内存序修饰符。它自C++17起不再被推荐使用,并从C++26起将正式被标记为废弃。
  2. 既有损于代码可读性又使程序执行结果难以预测的规范不实现。比如,线程围栏thread_fence。它被允许临时改变已定义原子操作的顺序属性。于是,原子操作字面定义的内存序就不一定必然是它在运行时实际生效的内存序了。

举个例子,虽然乍看将一条原子操作指令(下图左列)拆分为两条指令(下图右列)没什么负作用。请仔细观察下图右侧的《线程围栏定义法》

  • 前置的release序线程围栏使后续的store+relaxed原子操作在运行时呈现出release内存序的顺序性。
  • 后置的acquire序线程围栏使前驱的load+relaxed原子操作在运行时呈现出acquire内存序的顺序性。

输入图片说明

但若将线程围栏语句std::atomic_thread_fence(...)与原子操作语句atomic_var.store(...)之间的距离拉远一点儿呢。假设将原子写操作隔离到一个独立函数do_work()里,人们就会有些不安了。这是因为do_work()函数不具有幂等性,和依赖于调用端main()函数对它的副作用。而调用端程序通常都是不可控,甚至不可知的。

void do_work()
{ // 不具备幂等性的函数。其执行结果同时依赖于调用端其它程序
  // 指令对它的副作用,而不仅决定于函数的实参。
    atomic_var.store(1, std::memory_order_relaxed);
}
int main()
{   // 调用端的线程围栏语句会左右 do_work() 函数的执行结果
    std::atomic_thread_fence(std::memory_order_release);
    do_work();
}

更糟的情况是,让线程围栏语句与原子操作语句之间的距离甚至再远些。进一步将do_work()函数搬到某个外部依赖软件包里。并且,程序主模块的作者与依赖软件包的开发者“背对背”编程。那么,程序的执行结果将变得完全不可预测。这类由“副作用”导致的“惊喜”一直都被追求安全的 Rust 系统编程语言所禁忌。所以,我非常理解 Rust 技术团队未实现《C++ 20 线程围栏》特征的利弊权衡与内心挣扎。

题外话,若真有必要让函数内原子操作的顺序性在运行时按需指定,那么给do_work()函数定义一个内存序修饰符枚举类的形参才是更优选择。

void do_work(std::memory_order order)
{
    atomic_var.store(1, order);
}
int main()
{
    do_work(std::memory_order_release);
}

结束语

目前我能自恰讲清楚的内容就是这么多。这次文章的创作冲动源自社区内的一个开放性问题“信号量在 Rust 中是怎么用的?”。我当时潦草地回答“原子内存排序+自旋锁”。这个回复将原本纵深兼具的问题过度简单化了,而且回复内容也不太对。所以,事后回顾工程实践经历(文章内例程的原型源于此),收集文献资料(使经验总结有些理论支撑),创作此文。希望该文对社区内关注此技术方向的前端小伙伴有帮助。另外,个人认为文章内面向异步编程的浅谈与对比对后端程序的开发者也会有些启发。

微信转载文章链接

评论区

写评论
ManonLoki 2025-06-04 09:48

文章很长,读起来比较耗时,但是清晰的解释了无锁编程的原理。之前一直觉得在并发/异步编程中,Channel和锁都属于魔法,只会用,不知道其原理。这篇文章读完之后写写代码回来再读,效果会更好

sanri 2025-06-02 19:23

精品贴啊,顶起!

怪物之星 2025-06-02 17:15

既从底层原理入手讲述了 memory ordering 又将话题拓展到了 rust 库中(例如著名的 crossbeam) 是如何使用 memory ordering 以及有何可改进之处。 可见作者功力之深研究之透彻,并且技术视野也非常广阔,绝对值得深入研究学习的一篇博客。

顺手推荐一下 https://marabos.nl/atomics/ Rust Atomics and Locks-Low-Level Concurrency in Practice

结合这本书的内容,应该可以更好地理解此文的背景技术和在 Rust 中的使用。

1 共 3 条评论, 1 页