基于 Tokio Notify 的 SingleFlight 场景实现
SingleFlight 模式及其应用场景
SingleFlight 模式可以直译为“单飞”模式,Golang 官方扩展同步包 (golang.org/x/sync/singleflight)[1] 中包含了该模式的典型实现。根据该库中的定义,该模式主要是提供了一种重复函数调用抑制的机制。 而根据该库的实现表明,SingleFlight 模式的主要作用就是将一组相同的业务请求进行合并处理之后确保该业务请求只会做一次,然后对所有的其他的业务请求返回相同的结果。其本质是对特定业务函数调用的结果进行复用。 为什么需要 SingleFlight 模式呢?或者说 SingleFlight 模式的应用场景是什么呢?首先从上面的描述可以看出,SingleFlight 可以避免对外暴露的业务函数或者接口在短时间内被大规模的重复调用,从而极大地缓解终端用户对服务的访问压力。事实上这种场景对很多互联网或者基础性的服务和应用都是非常有意义的。譬如用户对热点数据的应用访问场景下,我们除了将热点数据进行缓存这样的常规手段外,对相应的服务和数据访问提供 SingleFlight 的支持也是一种对业务系统稳定性和可靠性的技术追求。
本文中 Rust 版的 SingleFlight 应用场景实现
本文中作者遇到的项目场景描述: 在云原生微服务的应用场景中,集群中存在大量的微服务,每一个微服务都保持着独立的演进且支持相同微服务的不同版本同时在线。我们的场景中集群的每一个节点上都会存在一个 Rust 实现的 Proxy。这个 Proxy 需要感知到集群中的每一个微服务及其对应的 endpoint 的变化,这些变化包括微服务的创建、销毁和更新。可以简单的理解为:每一个微服务包含一个 domain,而微服务后面对应的 endpoint 则映射着相应的 IP 地址和端口。 每个节点上的 Proxy 需要保存这些映射关系,且对外提供这种映射关系的查询服务。 如果调用 Proxy 的查询服务找不到相关微服务的映射关系或者映射关系的数据已经失效时,则会通过 host resolve 服务去找到映射关系的数据,然后将其新增或者更新到 Proxy 存储 中。需要说明的是微服务和它对应的 endpoint 的变化会随着业务发生变化,且 Proxy 的查询服务也会被频繁的调用,对于关键热点的微服务的查询服务调用每秒可能达到数千次。
问题描述和解决方案: 项目中我发现,在通过 Proxy 对同一个关键热点微服务的映射关系进行新增或者更新操作时总会出现巨大的延时。经过排查发现,每当对微服务及其 endpoint 进行新增或者更新操作时,紧随其后的对该微服务的 Proxy 查询服务也会进行大量重复的新增或者更新操作。 于是我的解决方案便是在 Proxy 内部实现 SingleFlight 模式,确保同一时间内无论多少次查询微服务的映射关系且触发映射关系的新增或者更新的操作只会进行一次。
对于此我很自然的想到两个解决方案:
- 使用 Rust std::sync::mpsc[2] 的多线程通道来实现 SingleFlight
- 使用 Rust tokio::sync::Notify[3] 的通知机制来实现 SingleFlight
之所以想到第一种方案是因为考虑到每一个请求都可以表示为一个线程,于是对大量的微服务请求的同步控制就可以转化为对多线程的同步控制。 然而真实的情况是:第一种方案是“多生产者和单消费者”(Multiple Producer Single Consumer)的模式,这正与之前描述的 SingleFlight 模式的本质:“通过对特定业务函数调用的结果进行复用” 是相反的。因为这里的“结果”本身是单个生产者,而复用该“结果”的是多个消费者。事实证明,虽然想要通过 std::sync::mpsc 去实现 SingleFlight 模式也是可以的,但是实现过程和代码会非常复杂。在经过数次尝试后,我果断放弃第一种方案而采用了第二种方案。 相比之下,第二种方案实现的代码就非常的简洁明了。
我简化了一下场景和解决方案,并在下面贴出了关键部分的代码进行讲解,同时我也把样本代码放到了我的 github [4],欢迎各位读者去 cargo build
。
async fn resolve_host(&self, hostname: String) -> Option<ResolvedDns> {
let (notify, is_first) = self.get_or_create_notify(&hostname);
if is_first {
println!("first get into the host name resolving task!");
// Perform the resolution only if it's the first request.
self.resolve_on_demand_dns(&hostname).await;
// Notify all waiters after the DNS resolving task completed.
notify.notify_waiters();
// The resolution is complete. We can remove the in-progress notify object.
self.in_progress.lock().unwrap().remove(&hostname);
} else {
// Wait for the in-progress resolution to complete.
notify.notified().await;
}
// Serve from the local cache after resolution completes.
self.find_resolved_host(&hostname)
}
下面我来解释一下上面的代码。 resolve_host
中最主要是承接瞬时对 hostname
的大量 resolve host 请求。 首先根据 get_or_create_notify
来判断进入的线程是否是所有相同 hostname
resolve host 请求的第一个。 如果是,那么通过调用 resolve_on_demand_dns
去做真正的 resolve host 业务,最重要的是通过调用 notify.notify_waiters();
来让其他的请求等待 resolve host 业务完成。 如果不是,则直接进入 else
分支等待,直到第一个进程完成业务后唤醒。 最后所有的进程完成后都可以调用 find_resolved_host
得到结果。
结论
对于第二种方案,需要指出的是该方案并没有对 resolve_host
的所有请求进行合并,而是只处理第一个请求,让其他请求都等待直到第一个请求完成。 然后下面展示的是对并发请求数分别为 100、 1000、 10000 和 100000 的时候的测试数值。 由于样本代码中 resolve_on_demand_dns
是通过 sleep 2s 来模拟业务函数的实现, 所以测试结果显示,面对不同并发请求数的请求耗时几乎都是在两秒左右。
根据评论区的评论,从我个人的角度来说还是特别期待在 Rust 也有更加通用高效的 SingleFlight package实现。
zhlsunshine-1@zhlsunshine-1:~/rust/rust-singleflight-with-notify/target/debug$ ./rust-singleflight-with-notify --concurrency 100
first get into the host name resolving task!
Time taken: 2.000608821s
zhlsunshine-1@zhlsunshine-1:~/rust/rust-singleflight-with-notify/target/debug$ ./rust-singleflight-with-notify --concurrency 1000
first get into the host name resolving task!
Time taken: 2.008098564s
zhlsunshine-1@zhlsunshine-1:~/rust/rust-singleflight-with-notify/target/debug$ ./rust-singleflight-with-notify --concurrency 10000
first get into the host name resolving task!
Time taken: 2.018548614s
zhlsunshine-1@zhlsunshine-1:~/rust/rust-singleflight-with-notify/target/debug$ ./rust-singleflight-with-notify --concurrency 100000
first get into the host name resolving task!
Time taken: 2.162775567s
关于作者
曾任职于阿尔卡特朗讯,百度,IBM 和 Intel 等知名企业担任资深研发工程师。拥有16年的技术研发背景。作为 Istio 社区的维护者,除了聚焦于云原生微服务等技术领域的研发实践外,也致力于云原生与 LLM 技术的交叉应用创新和研发实践。曾在 OpenVINO 和 Kserve 等社区贡献 LLM 推理服务在云原生场景下的实践 PR,也是 OPEA [Open Platform for Enterprise AI] 社区的开发者和维护者。曾在 KubeCon、ServiceMeshCon、 IstioCon、GOTC、 InfoQ/Qcon 和 GOSIM 等大会上发表演讲。
参考文档:
评论区
写评论类似 group commit。用队列就得传个回调,用条件变量么怎么都可以。
这也是一个考量哈,个人觉得 Lazy + ArcSwap 会是另外一个思路。后面可以考虑尝试试验一下。另外诸如此类的方案,可能也需要整体考量。个人还是特别期待在 Rust 也有通用的 SingleFlight 模式的实现。
--
👇
OldBird: 后端有变化时,ArcSwap中切换一个新的Lazy
👇
OldBird: 我的理解,用Lazy+ArcSwap搞定,Lazy中存储查询结果,ArcSwap中切换一个新的Lazy。
我阅读了你贴出来的代码了哈。我理解一下,Leadlock中包裹了Cond(新的结构体通过Condvar和UnsafeCell构成)和Mutex 一起实现了single_flight。基于我的场景其实是比较具体的,所以RwLock + Notify 就完成了。无论如何,谢谢你分享的新思路!
--
👇
zylthinking: 这个我也弄过, 也是从 golang 开始的, 其实是实现了一个叫做 leadlock 的锁, 然后用这它再实现 single_flight.
后来也将我的写法移植到了 rust https://docs.rs/comn/latest/src/comn/leadlock.rs.html#116-125
但我现在严重怀疑里面有 bug, 因为这是我对 rust 还不是特别熟的时候写的; 现在有点没信心. 因为这代码一直没有再重新看过.
不过, 我相信就算有问题也是细微问题, 整体思路肯定可行. 同步异步的都有实现, 贴的那个是同步的, 异步的应该在 syna 下
这个我也弄过, 也是从 golang 开始的, 其实是实现了一个叫做 leadlock 的锁, 然后用这它再实现 single_flight.
后来也将我的写法移植到了 rust https://docs.rs/comn/latest/src/comn/leadlock.rs.html#116-125
但我现在严重怀疑里面有 bug, 因为这是我对 rust 还不是特别熟的时候写的; 现在有点没信心. 因为这代码一直没有再重新看过.
不过, 我相信就算有问题也是细微问题, 整体思路肯定可行. 同步异步的都有实现, 贴的那个是同步的, 异步的应该在 syna 下
后端有变化时,ArcSwap中切换一个新的Lazy
👇
OldBird: 我的理解,用Lazy+ArcSwap搞定,Lazy中存储查询结果,ArcSwap中切换一个新的Lazy。
我的理解,用Lazy+ArcSwap搞定,Lazy中存储查询结果,ArcSwap中切换一个新的Lazy。