深入剖析 RxSwift 中的 Queue:环形队列的 Swift 实战
高性能事件调度背后的数据结构机密
目录
背景
在 RxSwift 的内部实现里,无论是事件传递还是调度缓冲,环形队列(circular queue) 都扮演着基石般的角色。
- 事件先进先出 (FIFO):当前事件必须处理完才能继续下一个。
-
背压/缓存:当上游过快、下游较慢时,诸如
observeOn
、flatMap
、concatMap
等操作符会把事件暂存于队列,待消费端就绪后再逐一处理。
为什么选择环形队列?
特性 | 优势 | 适用场景 |
---|---|---|
O(1) 入队/出队 | 仅索引递增,无需移动元素 | 高频、短生命周期任务 |
内存连续 | 优秀的缓存命中率 | 调度器、事件缓冲 |
动态扩缩容简单 | 比链表更高效 | 流量激增或骤减 |
低额外开销 | 仅两个索引变量 | 移动端资源受限 |
相比链表,环形数组几乎在所有维度都更贴合 RxSwift “大量、快速、短命事件”的特征。
- 链表 vs 数组:链表虽不需扩容,但每个节点单独的创建和销毁,在高并发写入场景中频繁的创建和销毁节点反而更慢,且容易造成内存碎片。
- 缓存命中(CPU cache locality):数据被访问的方式与缓存(cache)的效率之间的关系。它体现了程序在访问内存时是否能有效地利用 CPU 的高速缓存,以提高性能
环形队列数据结构概览
环形队列(Circular Queue)是什么?
环形队列是一种使用 固定大小或按需调整的顺序存储空间 来实现队列 (FIFO) 语义的数据结构。它将底层数组的首尾视为相连,通过“回环”或取模运算,让 最后一个槽位的下一个位置 重新映射到索引 0
。这样无需搬移元素即可完成出队操作,同时保持内存连续性。
概念要点
关键词 | 说明 |
---|---|
固定容量 & 动态容量 | 最简单实现容量固定;RxSwift 通过“两倍扩张 + 四分之一缩容”策略自动伸缩。 |
双指针/双索引 |
pushNextIndex 指向待写入槽位;dequeueIndex 指向待读取槽位。差值即为队列长度。 |
逻辑连续 vs 物理分裂 | 队列元素在逻辑上按时间顺序连续,但物理存储可能被分成“数组尾段 + 数组头段”两段。 |
下图演示了典型的环形队列状态变化(绿色为已占用单元):
-
pushNextIndex
:下一个写入位置 -
dequeueIndex
:通过计算得到的队头位置 - 当写指针越过数组尾部,会从索引
0
重新开始形成“环”。
环形队列向外公开的 API
方法 / 属性 | 作用 | 时间复杂度 |
---|---|---|
enqueue(_ element: T) |
入队,将元素追加至尾部 |
O(1) 均摊 |
dequeue() -> T? |
出队,弹出队头元素,队列为空返回 nil
|
O(1) |
peek() -> T |
只读队头但不移除 | O(1) |
isEmpty: Bool |
队列是否为空 | O(1) |
count: Int |
当前元素数量 | O(1) |
makeIterator() |
Swift Sequence 迭代支持 |
O(n) 整体,但单步 O(1)
|
这些接口与标准队列抽象保持一致,开发者无需关心内部的环形实现即可完成常见操作。数组带来的 Cache 友好和一次性拷贝优化,使得这些 API 在实际运行中维持极低的延迟。
关键实现细节
数据布局与索引计算
private var storage: ContiguousArray<T?>
private var pushNextIndex = 0 // **写指针** —— 指向下一次写入的位置
private var innerCount = 0 // 当前元素数量
private var dequeueIndex: Int { // **读指针** —— 通过计算得到
let index = pushNextIndex - count
return idx < 0 ? index + storage.count : index
}
为什么保存 pushNextIndex
,而 计算 dequeueIndex
?
在 RxSwift 场景里,入队(enqueue)操作通常比出队(dequeue)更高频,在响应式流中,数据通常由外部事件(如网络响应、用户输入、定时器)快速产生,然后再异步地被消费者(观察者、订阅者)慢慢处理。因此,为了简化 入队(enqueue)操作,并保证 扩容后的数据迁移逻辑更加高效和清晰,因此选择保存和队尾元素相关索引。
计算公式拆解
-
队头理论推导:
dequeueIndex = (pushNextIndex - innerCount + capacity) % capacity
-
实际代码实现:先做减法再按需加
capacity
在编码过程中,尽量避免使用乘*、除/、模%、浮点数,效率低下,CPU做这些操作比较耗时;因此,RxSwift队头实际的计算转换成了加和减
这样既可保持常量级计算,又能在无需硬件除法指令的情况下完成取模。
入队 enqueue
enqueue
mutating func enqueue(_ element: T) {
if count == storage.count { // 存满 → 扩容
resizeTo(max(storage.count, 1) * resizeFactor)
}
storage[pushNextIndex] = element // 写入
pushNextIndex += 1
innerCount += 1
if pushNextIndex >= storage.count { // 环回
pushNextIndex -= storage.count
}
}
- 空间不足 → 扩容
- 数据写入
- 写索引自增,如越界则回环
出队 dequeue
mutating func dequeue() -> T? {
if self.count == 0 {
return nil
}
defer {
let downsizeLimit = storage.count / (resizeFactor * resizeFactor)
if count < downsizeLimit, downsizeLimit >= initialCapacity {
resizeTo(storage.count / resizeFactor) // 缩容
}
}
return dequeueElementOnly()
}
private mutating func dequeueElementOnly() -> T {
precondition(count > 0)
let index = dequeueIndex
defer {
storage[index] = nil
innerCount -= 1
}
return storage[index]!
}
-
先读后清 ,缩容操作之所以放在
defer
中,是为了确保在元素真正出队(dequeueElementOnly)之后再进行判断和可能的缩容操作;基于更新后的真实元素数量的“延后清理或操作”的模式。
动态扩容与缩容 resize
环形队列的数组在写满或低于一定数量时会触发容量调整,核心目标是:
- 保证逻辑顺序不变 —— 队列是 FIFO,调整后索引必须保持原先的出队顺序;
- 一次性线性拷贝 —— 尽可能利用批量拷贝,避免逐元素迁移;
-
简化后续索引运算 —— 调整后让队头落到
0
,pushNextIndex
落到count
。
mutating private func resizeTo(_ size: Int) {
// 申请新数组
var newStorage = ContiguousArray<T?>(repeating: nil, count: size)
// 保存现有元素总数
let count = self.count
// 旧队头位置 & 尾段剩余空间
let dequeueIndex = self.dequeueIndex
let spaceToEndOfQueue = storage.count - dequeueIndex
// **** 分段拷贝 ****
//第一次拷贝的原来的尾段
let countElementsInFirstBatch = Swift.min(count, spaceToEndOfQueue)
// 第二次拷贝的原来的头段
let numberOfElementsInSecondBatch = count - countElementsInFirstBatch
// 原来的尾段放到新的数组开始的位置
newStorage[0 ..< countElementsInFirstBatch] = storage[dequeueIndex ..< (dequeueIndex + countElementsInFirstBatch)]
// 原来的头段追加到新数组中,此时队头就是新数组索引0的位置,pushNextIndex即为新数组的数量
newStorage[countElementsInFirstBatch ..< (countElementsInFirstBatch + numberOfElementsInSecondBatch)] = storage[0 ..< numberOfElementsInSecondBatch]
// 更新索引与存储
self.innerCount = count
pushNextIndex = count
storage = newStorage
}
为什么要“两段复制”?
- 可能的分裂:当队列发生环回后,逻辑上连续的元素会被物理地分成“尾段 + 头段”两段。若直接拷贝整个旧数组,元素顺序将错乱。
-
维持 FIFO 顺序:先复制尾段(从
dequeueIndex
到旧数组末尾),再复制头段(索引0
开始)的剩余部分,可在新数组[0..<count)
重新拼接出正确的时间顺序。 - 线性内存访问:每一段都是线性区域,且可被系统优化为块拷贝。
队头元素位置的变化
操作前 | 操作后 |
---|---|
队头索引 = dequeueIndex (可能 > 0) |
固定为 0 |
pushNextIndex (任意位置) |
固定为 count |
缩容与扩容共用同一逻辑,只是
size
参数不同,因为尾段和头段均是连续存储。当元素数量小于capacity / 4
且不低于初始容量时触发缩容,确保内存占用与业务峰谷相匹配。
性能分析
-
时间复杂度
- 入队/出队:
O(1)
- 扩/缩容:均摊
O(1)
(几何倍数增长,摊销成本极低)
- 入队/出队:
-
对比链表
- 链表
enqueue
需分配节点;数组仅更新索引 - 链表缺乏 缓存命中
- 链表
虽然扩容是一次性 O(n),但每次扩容都是在上一次扩容后进行了很多次 O(1) 的插入之后才发生,触发扩容的代价会被之前的多次 O(1) 插入“摊销”掉。
与其他数据结构对比
特性 | 环形数组 | 单向链表 | 双端队列 Deque
|
---|---|---|---|
内存布局 | 连续 | 离散 | 连续 |
入/出队时间 | O(1) | O(1) (指针操作) | O(1) |
扩容开销 |
copy (偶发)
|
无 |
copy (偶发)
|
缓存命中 | 高 | 低 | 中 |
适合场景 | 高频、小对象 | 大对象、频繁插入删除中间节点 | 双端操作 |
在 RxSwift 中的实际应用场景
场景 | 描述 | 优势体现 |
---|---|---|
调度器 (SerialDispatchQueueScheduler ) |
将任务缓存至队列,串行执行 | 线程安全 + 低延迟 |
操作符 observeOn |
上游高速 → 队列缓存 → 下游消费 | 背压管理 |
合并流 (flatMap , concatMap ) |
子流事件临时缓冲 | 减少锁/条件变量 |
背压管理(Backpressure Management) : 是一种控制数据流速的机制,目的是防止生产者(数据发送方)发送数据过快,导致消费者(数据接收方)来不及处理,最终引发资源耗尽、缓冲区溢出或系统崩溃等问题。
示例:如何在项目中复用该实现
var q = Queue<Int>(capacity: 4)
// 写入
(1...10).forEach { q.enqueue($0) }
// 消费
while !q.isEmpty {
print(q.dequeue()!)
}
总结
- 环形队列 通过常数级操作与良好缓存局部性,完美契合 RxSwift 的事件驱动模型。
- 精巧的 写索引 + 计算读索引 方案,简化了状态管理。
- 扩缩容逻辑在数组层面“一劳永逸”,保持 API 简洁。
掌握并善用这一数据结构,不仅能帮助你更深入理解 RxSwift 的内部机理,也能在自己的高性能队列、调度器甚至网络层缓存中大显身手。
参考链接
- RxSwift 源码
Source/Schedulers/Queue.swift