阅读视图

发现新文章,点击刷新页面。

深入剖析 RxSwift 中的 Queue:环形队列的 Swift 实战

高性能事件调度背后的数据结构机密


目录

  1. 背景
  2. 为什么选择环形队列?
  3. 环形队列数据结构概览
  4. 关键实现细节
    1. 数据布局与索引计算
    2. 入队 enqueue
    3. 出队 dequeue
    4. 动态扩容与缩容 resize
  5. 性能分析
  6. 与其他数据结构对比
  7. 在 RxSwift 中的实际应用场景
  8. 示例:如何在项目中复用该实现
  9. 总结
  10. 参考链接

背景

RxSwift 的内部实现里,无论是事件传递还是调度缓冲,环形队列(circular queue) 都扮演着基石般的角色。

  • 事件先进先出 (FIFO):当前事件必须处理完才能继续下一个。
  • 背压/缓存:当上游过快、下游较慢时,诸如 observeOnflatMapconcatMap 等操作符会把事件暂存于队列,待消费端就绪后再逐一处理。

为什么选择环形队列?

特性 优势 适用场景
O(1) 入队/出队 仅索引递增,无需移动元素 高频、短生命周期任务
内存连续 优秀的缓存命中率 调度器、事件缓冲
动态扩缩容简单 比链表更高效 流量激增或骤减
低额外开销 仅两个索引变量 移动端资源受限

相比链表,环形数组几乎在所有维度都更贴合 RxSwift “大量、快速、短命事件”的特征。

  • 链表 vs 数组:链表虽不需扩容,但每个节点单独的创建和销毁,在高并发写入场景中频繁的创建和销毁节点反而更慢,且容易造成内存碎片。
  • 缓存命中(CPU cache locality):数据被访问的方式与缓存(cache)的效率之间的关系。它体现了程序在访问内存时是否能有效地利用 CPU 的高速缓存,以提高性能

环形队列数据结构概览

环形队列(Circular Queue)是什么?
环形队列是一种使用 固定大小或按需调整的顺序存储空间 来实现队列 (FIFO) 语义的数据结构。它将底层数组的首尾视为相连,通过“回环”或取模运算,让 最后一个槽位的下一个位置 重新映射到索引 0。这样无需搬移元素即可完成出队操作,同时保持内存连续性。

概念要点

关键词 说明
固定容量 & 动态容量 最简单实现容量固定;RxSwift 通过“两倍扩张 + 四分之一缩容”策略自动伸缩。
双指针/双索引 pushNextIndex 指向待写入槽位;dequeueIndex 指向待读取槽位。差值即为队列长度。
逻辑连续 vs 物理分裂 队列元素在逻辑上按时间顺序连续,但物理存储可能被分成“数组尾段 + 数组头段”两段。

下图演示了典型的环形队列状态变化(绿色为已占用单元):

image.png

image.png

  • pushNextIndex:下一个写入位置
  • dequeueIndex:通过计算得到的队头位置
  • 当写指针越过数组尾部,会从索引 0 重新开始形成“环”。

环形队列向外公开的 API

方法 / 属性 作用 时间复杂度
enqueue(_ element: T) 入队,将元素追加至尾部 O(1) 均摊
dequeue() -> T? 出队,弹出队头元素,队列为空返回 nil O(1)
peek() -> T 只读队头但不移除 O(1)
isEmpty: Bool 队列是否为空 O(1)
count: Int 当前元素数量 O(1)
makeIterator() Swift Sequence 迭代支持 O(n) 整体,但单步 O(1)

这些接口与标准队列抽象保持一致,开发者无需关心内部的环形实现即可完成常见操作。数组带来的 Cache 友好和一次性拷贝优化,使得这些 API 在实际运行中维持极低的延迟。


关键实现细节

数据布局与索引计算

private var storage: ContiguousArray<T?>
private var pushNextIndex = 0   // **写指针** —— 指向下一次写入的位置
private var innerCount    = 0   // 当前元素数量

private var dequeueIndex: Int { // **读指针** —— 通过计算得到
    let index = pushNextIndex - count
    return idx < 0 ? index + storage.count : index
}

为什么保存 pushNextIndex,而 计算 dequeueIndex

在 RxSwift 场景里,入队(enqueue)操作通常比出队(dequeue)更高频,在响应式流中,数据通常由外部事件(如网络响应、用户输入、定时器)快速产生,然后再异步地被消费者(观察者、订阅者)慢慢处理。因此,为了简化 入队(enqueue)操作,并保证 扩容后的数据迁移逻辑更加高效和清晰,因此选择保存和队尾元素相关索引。

计算公式拆解

  • 队头理论推导dequeueIndex = (pushNextIndex - innerCount + capacity) % capacity
  • 实际代码实现:先做减法再按需加 capacity

在编码过程中,尽量避免使用乘*、除/、模%、浮点数,效率低下,CPU做这些操作比较耗时;因此,RxSwift队头实际的计算转换成了加和减

这样既可保持常量级计算,又能在无需硬件除法指令的情况下完成取模。

入队 enqueue enqueue

mutating func enqueue(_ element: T) {
    if count == storage.count {        // 存满 → 扩容
        resizeTo(max(storage.count, 1) * resizeFactor)
    }
    storage[pushNextIndex] = element   // 写入
    pushNextIndex += 1
    innerCount += 1
    if pushNextIndex >= storage.count { // 环回
        pushNextIndex -= storage.count
    }
}
  1. 空间不足 → 扩容
  2. 数据写入
  3. 写索引自增,如越界则回环

出队 dequeue

mutating func dequeue() -> T? {
    if self.count == 0 {
        return nil
    }

    defer {
        let downsizeLimit = storage.count / (resizeFactor * resizeFactor)
        if count < downsizeLimit, downsizeLimit >= initialCapacity {
            resizeTo(storage.count / resizeFactor)  // 缩容
        }
    }
    return dequeueElementOnly()
}

 private mutating func dequeueElementOnly() -> T {
    precondition(count > 0)
    
    let index = dequeueIndex

    defer {
        storage[index] = nil
        innerCount -= 1
    }

    return storage[index]!
}

  • 先读后清 ,缩容操作之所以放在 defer 中,是为了确保在元素真正出队(dequeueElementOnly)之后再进行判断和可能的缩容操作;基于更新后的真实元素数量的“延后清理或操作”的模式。

动态扩容与缩容 resize

环形队列的数组在写满或低于一定数量时会触发容量调整,核心目标是:

  1. 保证逻辑顺序不变 —— 队列是 FIFO,调整后索引必须保持原先的出队顺序;
  2. 一次性线性拷贝 —— 尽可能利用批量拷贝,避免逐元素迁移;
  3. 简化后续索引运算 —— 调整后让队头落到 0pushNextIndex落到 count
mutating private func resizeTo(_ size: Int) {
    // 申请新数组
    var newStorage = ContiguousArray<T?>(repeating: nil, count: size)
    // 保存现有元素总数
    let count = self.count
    // 旧队头位置 & 尾段剩余空间
    let dequeueIndex = self.dequeueIndex
    let spaceToEndOfQueue = storage.count - dequeueIndex
    // **** 分段拷贝 ****
    //第一次拷贝的原来的尾段
    let countElementsInFirstBatch = Swift.min(count, spaceToEndOfQueue)
    // 第二次拷贝的原来的头段
    let numberOfElementsInSecondBatch = count - countElementsInFirstBatch
    // 原来的尾段放到新的数组开始的位置
    newStorage[0 ..< countElementsInFirstBatch] = storage[dequeueIndex ..< (dequeueIndex + countElementsInFirstBatch)]
    // 原来的头段追加到新数组中,此时队头就是新数组索引0的位置,pushNextIndex即为新数组的数量
    newStorage[countElementsInFirstBatch ..< (countElementsInFirstBatch + numberOfElementsInSecondBatch)] = storage[0 ..< numberOfElementsInSecondBatch]
    // 更新索引与存储
    self.innerCount = count
    pushNextIndex = count
    storage = newStorage

}

为什么要“两段复制”?

  • 可能的分裂:当队列发生环回后,逻辑上连续的元素会被物理地分成“尾段 + 头段”两段。若直接拷贝整个旧数组,元素顺序将错乱。
  • 维持 FIFO 顺序:先复制尾段(从 dequeueIndex 到旧数组末尾),再复制头段(索引 0 开始)的剩余部分,可在新数组 [0..<count) 重新拼接出正确的时间顺序。
  • 线性内存访问:每一段都是线性区域,且可被系统优化为块拷贝。

队头元素位置的变化

操作前 操作后
队头索引 = dequeueIndex (可能 > 0) 固定为 0
pushNextIndex (任意位置) 固定为 count

缩容与扩容共用同一逻辑,只是 size 参数不同,因为尾段和头段均是连续存储。当元素数量小于 capacity / 4 且不低于初始容量时触发缩容,确保内存占用与业务峰谷相匹配。

性能分析

  • 时间复杂度
    • 入队/出队:O(1)
    • 扩/缩容:均摊 O(1)(几何倍数增长,摊销成本极低)
  • 对比链表
    • 链表 enqueue 需分配节点;数组仅更新索引
    • 链表缺乏 缓存命中

虽然扩容是一次性 O(n),但每次扩容都是在上一次扩容后进行了很多次 O(1) 的插入之后才发生,触发扩容的代价会被之前的多次 O(1) 插入“摊销”掉。


与其他数据结构对比

特性 环形数组 单向链表 双端队列 Deque
内存布局 连续 离散 连续
入/出队时间 O(1) O(1) (指针操作) O(1)
扩容开销 copy (偶发) copy (偶发)
缓存命中
适合场景 高频、小对象 大对象、频繁插入删除中间节点 双端操作

在 RxSwift 中的实际应用场景

场景 描述 优势体现
调度器 (SerialDispatchQueueScheduler) 将任务缓存至队列,串行执行 线程安全 + 低延迟
操作符 observeOn 上游高速 → 队列缓存 → 下游消费 背压管理
合并流 (flatMap, concatMap) 子流事件临时缓冲 减少锁/条件变量

背压管理(Backpressure Management) : 是一种控制数据流速的机制,目的是防止生产者(数据发送方)发送数据过快,导致消费者(数据接收方)来不及处理,最终引发资源耗尽、缓冲区溢出或系统崩溃等问题。


示例:如何在项目中复用该实现

var q = Queue<Int>(capacity: 4)

// 写入
(1...10).forEach { q.enqueue($0) }

// 消费
while !q.isEmpty {
    print(q.dequeue()!)
}

总结

  • 环形队列 通过常数级操作与良好缓存局部性,完美契合 RxSwift 的事件驱动模型。
  • 精巧的 写索引 + 计算读索引 方案,简化了状态管理。
  • 扩缩容逻辑在数组层面“一劳永逸”,保持 API 简洁。

掌握并善用这一数据结构,不仅能帮助你更深入理解 RxSwift 的内部机理,也能在自己的高性能队列、调度器甚至网络层缓存中大显身手。


参考链接

  • RxSwift 源码 Source/Schedulers/Queue.swift
❌