📋 目录
一、RxSwift框架使用详解
1. RxSwift框架概述
RxSwift 是 ReactiveX(Reactive Extensions)的 Swift 实现,是一个用于处理异步事件流的函数式响应式编程框架。
1.1 什么是RxSwift
RxSwift 基于观察者模式,允许你通过组合不同的操作符来处理异步事件序列。它提供了声明式的 API 来处理时间序列数据。
核心特点:
-
响应式编程:基于观察者模式的事件驱动编程
-
函数式编程:使用高阶函数和操作符组合
-
类型安全:充分利用 Swift 的类型系统
-
跨平台:基于 ReactiveX 标准,与其他平台一致
-
丰富的操作符:提供大量操作符处理各种场景
1.2 RxSwift vs Combine
| 特性 |
RxSwift |
Combine |
| 平台 |
跨平台(iOS、macOS、watchOS、tvOS) |
Apple 生态(iOS 13+) |
| 语言 |
Swift |
Swift |
| 官方支持 |
❌ 第三方(ReactiveX) |
✅ Apple 官方 |
| 最低版本 |
iOS 8.0+ |
iOS 13.0+ |
| API风格 |
ReactiveX 标准 |
Apple 风格 |
| 学习曲线 |
陡峭 |
中等 |
| 生态 |
丰富(RxCocoa、RxDataSources等) |
官方集成(SwiftUI) |
1.3 RxSwift生态系统
-
RxSwift:核心框架
-
RxCocoa:UIKit/AppKit 集成
-
RxDataSources:TableView/CollectionView 数据源
-
RxTest:测试工具
-
RxBlocking:阻塞操作符(用于测试)
1.4 安装方式
CocoaPods:
pod 'RxSwift', '~> 6.0'
pod 'RxCocoa', '~> 6.0'
SPM:
dependencies: [
.package(url: "https://github.com/ReactiveX/RxSwift.git", from: "6.0.0")
]
1.5 编程思想(背后的范式与理念)
为什么要先谈编程思想?
会用 RxSwift 的 API(Observable、subscribe、map、flatMap 等)不等于能写好响应式架构。很多「看起来能跑」的代码其实仍是用响应式语法写命令式逻辑(例如在 subscribe 里写满 if-else、嵌套请求),难以测试、难以复用。先理解背后的范式与理念,再写代码,才能做到「用对场景、写对抽象、边界清晰」。RxSwift 与 Combine 同属 ReactiveX 一脉,背后的编程思想高度一致;理解这些思想有助于写出更清晰、可维护的响应式代码。
范式定位:FRP(函数式响应式编程)
RxSwift 是 FRP(Functional Reactive Programming) 的一种实现:用函数式的组合与不可变方式,处理响应式的事件流。不是「要么函数式要么响应式」,而是两者结合——流用操作符做纯变换(函数式),用订阅对事件做出反应(响应式)。了解这一点,就不会把 Rx 单纯当成「另一种回调封装」,而是从「流 + 变换 + 订阅」的视角设计数据与 UI 的边界。
(1)响应式编程(Reactive Programming)
-
核心:将「数据与事件」视为随时间发生的事件序列,通过订阅对序列中的每一项做出反应,而不是主动轮询或层层回调。
-
在 RxSwift 中:
Observable 表示一条事件流,Observer 通过 subscribe 订阅后,在 onNext / onError / onCompleted 中响应;按钮点击、网络返回、定时器都可统一为 Observable,用同一套操作符处理。
-
思维转变:从「先调 A,等回调再调 B」变为「当流里出现某类事件时,执行 B」,逻辑由数据/事件驱动。
(2)声明式 vs 命令式
| 维度 |
命令式(Imperative) |
声明式(Declarative) |
| 关注点 |
「怎么做」:显式控制顺序与分支 |
「做什么」:描述结果与数据变换关系 |
| 典型写法 |
for 循环、if-else、嵌套回调 |
链式操作符:map / filter / flatMap / combineLatest |
| 在 RxSwift 中 |
手写「请求 → 回调里解析 → 再请求」 |
用 observable.map(...).flatMap(...).subscribe(...) 描述整条流水线 |
声明式让「数据从哪来、怎么变、到哪去」一目了然,便于阅读和单元测试。
从 OOP/命令式到响应式的思维转变:传统写法习惯「谁持有谁、谁调谁」——对象持有状态,方法里 if-else 控制流程,异步靠回调或 delegate。响应式则把「谁在什么时候产生什么」抽象成流,把「对数据的处理」抽象成操作符链,把「最终消费」放在订阅里。习惯后,你会先想「有哪些事件源」「它们如何组合、变换」,再写具体订阅逻辑,而不是一上来就写一堆属性和回调。
同一需求的两种写法对比(搜索框防抖 + 请求 + 只取非空):
命令式常见写法是:在文本回调里设 Timer、取消上一次请求、判断非空再发请求、在回调里更新 UI,逻辑分散在多处。用 RxSwift 可以写成一条「流」:
// 响应式:一条链描述「输入 → 防抖 → 非空过滤 → 请求 → 主线程更新」
searchTextField.rx.text.orEmpty
.debounce(.milliseconds(300), scheduler: MainScheduler.instance)
.filter { !$0.isEmpty }
.flatMapLatest { query in api.search(query) }
.observeOn(MainScheduler.instance)
.subscribe(onNext: { results in self.updateUI(results) })
.disposed(by: disposeBag)
这样,「防抖」「过滤空串」「只保留最后一次请求」「切回主线程」都体现在操作符上,阅读时一眼能看出数据流;单元测试时可以对 Observable 链单独测,而不必依赖 UI。
(3)函数式思想(组合与不可变)
-
组合(Composition):每个操作符只做一件事,通过
.map().filter().distinctUntilChanged() 等组合成完整逻辑,而不是在一个闭包里写尽所有逻辑。
-
不可变(Immutability):操作符不修改原 Observable,而是返回新的 Observable;原流不变,便于复用和推理。
-
副作用边界:纯变换放在操作符链中,副作用(UI 更新、写库、弹窗)集中在
subscribe 的闭包里,便于测试和并发安全。
(4)流与时间(Streams & Time)
- 把所有「会随时间产生的事件」都视为时间序列:next、next、…、completed/error。
- 时间相关操作符:
debounce(静默一段时间后取最新)、throttle(间隔内只取第一个/最后一个)、delay(延后发射),统一表达「何时」而不只是「何值」。
(5)观察者与发布-订阅
-
观察者模式:Observer 订阅 Observable,在事件发生时被通知。RxSwift 的
subscribe(onNext:onError:onCompleted:) 就是在注册观察者。
-
发布-订阅:生产端(Observable)与消费端(Observer)解耦,通过
Disposable 表示一次订阅的生命周期;Rx 的「热/冷」流、背压(部分算子)都是在这一模型上的扩展。
(6)设计原则在 Rx 中的体现
| 原则 |
在 RxSwift 中的体现 |
| 单一职责 |
每个操作符只做一种变换(map 只做映射,filter 只做过滤),复杂逻辑由链式组合完成。 |
| 关注点分离 |
数据获取与变换在 Observable 链中,线程切换用 subscribeOn/observeOn,副作用集中在 subscribe。 |
| 依赖倒置 |
业务依赖「Observable 流」的抽象,而不依赖具体如何产生事件(网络、本地、Mock 都可替换)。 |
| 开闭原则 |
通过新操作符或新 Observable 扩展行为,而不必修改已有链;原流不可变,易于复用。 |
小结:RxSwift 用声明式的事件流(Observable)和可组合的操作符,在观察者/发布-订阅模型下做响应式的异步与事件处理,并用 Scheduler 控制线程与时机。掌握这些思想后,再写「为什么用 map 而不是在 subscribe 里写一大段」「为什么需要 observeOn/subscribeOn」会更自然。
2. 核心概念
2.1 Observable(可观察序列)
Observable 是 RxSwift 的核心,表示可以观察的事件序列。
protocol ObservableType {
associatedtype Element
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable
where Observer.Element == Element
}
特点:
- 可以发出零个或多个事件
- 可能以完成或错误结束
- 是值类型(struct)
- 不可变(每次操作返回新的 Observable)
事件类型:
enum Event<Element> {
case next(Element) // 下一个元素
case error(Swift.Error) // 错误
case completed // 完成
}
示例:
// 创建一个简单的 Observable
let observable = Observable<String>.just("Hello, RxSwift!")
observable.subscribe(onNext: { value in
print(value) // 输出: Hello, RxSwift!
}, onError: { error in
print("错误: \(error)")
}, onCompleted: {
print("完成")
})
.disposed(by: disposeBag)
// 使用数组创建 Observable
let arrayObservable = Observable.from([1, 2, 3, 4, 5])
arrayObservable.subscribe(onNext: { value in
print(value) // 依次输出: 1, 2, 3, 4, 5
})
.disposed(by: disposeBag)
2.2 Observer(观察者)
Observer 是接收 Observable 事件的协议。
protocol ObserverType {
associatedtype Element
func on(_ event: Event<Element>)
}
内置 Observer:
-
onNext:接收下一个元素
-
onError:接收错误
-
onCompleted:接收完成事件
示例:
let observable = Observable.from([1, 2, 3])
observable.subscribe(
onNext: { value in
print("收到值: \(value)")
},
onError: { error in
print("错误: \(error)")
},
onCompleted: {
print("完成")
}
)
.disposed(by: disposeBag)
2.3 Disposable(可释放资源)
Disposable 表示订阅关系,用于取消订阅和释放资源。
protocol Disposable {
func dispose()
}
DisposeBag:
class ViewController: UIViewController {
private let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
Observable.just("Hello")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag) // 自动管理生命周期
}
}
3. Observable与Observer
3.1 创建Observable
just
创建只发出一个元素的 Observable。
let observable = Observable.just("Hello")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
from
从数组或序列创建 Observable。
let observable = Observable.from([1, 2, 3])
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
of
从多个元素创建 Observable。
let observable = Observable.of(1, 2, 3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
create
自定义创建 Observable。
let observable = Observable<String>.create { observer in
observer.onNext("A")
observer.onNext("B")
observer.onCompleted()
return Disposables.create()
}
observable.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
empty
创建不发出任何元素的 Observable。
let observable = Observable<Int>.empty()
.subscribe(
onNext: { print($0) },
onCompleted: { print("完成") }
)
.disposed(by: disposeBag)
// 输出: 完成
never
创建永不发出事件也永不完成的 Observable。
let observable = Observable<Int>.never()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 无输出
error
创建立即发出错误的 Observable。
enum MyError: Error {
case customError
}
let observable = Observable<Int>.error(MyError.customError)
.subscribe(
onNext: { print($0) },
onError: { print("错误: \($0)") }
)
.disposed(by: disposeBag)
// 输出: 错误: customError
range
创建发出指定范围内整数的 Observable。
let observable = Observable.range(start: 1, count: 5)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 1, 2, 3, 4, 5
repeatElement
重复发出指定元素。
let observable = Observable.repeatElement("Hello")
.take(3) // 只取前3个
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: Hello, Hello, Hello
interval
按指定时间间隔发出整数。
let observable = Observable<Int>.interval(
.seconds(1),
scheduler: MainScheduler.instance
)
.take(5)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 每秒输出: 0, 1, 2, 3, 4
timer
延迟指定时间后发出元素。
let observable = Observable<Int>.timer(
.seconds(2),
scheduler: MainScheduler.instance
)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 2秒后输出: 0
3.2 自定义Observable
struct CustomObservable<Element>: ObservableType {
typealias Element = Element
private let _subscribe: (AnyObserver<Element>) -> Disposable
init(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) {
self._subscribe = subscribe
}
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable
where Observer.Element == Element {
let anyObserver = AnyObserver(observer)
return _subscribe(anyObserver)
}
}
// 使用
let custom = CustomObservable<Int> { observer in
observer.onNext(1)
observer.onNext(2)
observer.onCompleted()
return Disposables.create()
}
custom.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
4. Operators操作符
4.1 转换操作符
map
转换每个元素。
Observable.from([1, 2, 3])
.map { $0 * 2 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 2, 4, 6
flatMap
将 Observable 发出的元素转换为 Observable,然后合并。
Observable.from(["A", "B", "C"])
.flatMap { letter in
Observable.from([1, 2]).map { "\(letter)\($0)" }
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: A1, A2, B1, B2, C1, C2
flatMapLatest
只保留最新的内部 Observable。
Observable.from(["A", "B", "C"])
.flatMapLatest { letter in
Observable.just(letter).delay(.seconds(1), scheduler: MainScheduler.instance)
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 只输出: C(A和B被取消)
scan
累积值。
Observable.from([1, 2, 3, 4, 5])
.scan(0, accumulator: +)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 1, 3, 6, 10, 15
buffer
缓冲元素。
Observable.from([1, 2, 3, 4, 5, 6, 7, 8])
.buffer(timeSpan: .seconds(1), count: 3, scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: [1, 2, 3], [4, 5, 6], [7, 8]
window
将 Observable 分割为多个 Observable。
Observable.from([1, 2, 3, 4, 5, 6])
.window(timeSpan: .seconds(1), count: 2, scheduler: MainScheduler.instance)
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
4.2 过滤操作符
filter
过滤元素。
Observable.from([1, 2, 3, 4, 5])
.filter { $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 2, 4
distinctUntilChanged
移除连续重复的元素。
Observable.from([1, 1, 2, 2, 3, 3])
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 1, 2, 3
elementAt
获取指定索引的元素。
Observable.from([1, 2, 3, 4, 5])
.elementAt(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 3
first / last
获取第一个或最后一个元素。
Observable.from([1, 2, 3, 4, 5])
.first()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 1
Observable.from([1, 2, 3, 4, 5])
.last()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 5
take / takeLast
获取前几个或后几个元素。
Observable.from([1, 2, 3, 4, 5])
.take(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 1, 2, 3
Observable.from([1, 2, 3, 4, 5])
.takeLast(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 3, 4, 5
skip / skipLast
跳过前几个或后几个元素。
Observable.from([1, 2, 3, 4, 5])
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 3, 4, 5
debounce
防抖,等待指定时间后发出最新值。
let subject = PublishSubject<String>()
subject
.debounce(.milliseconds(500), scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject.onNext("H") // 不输出
subject.onNext("He") // 不输出
subject.onNext("Hel") // 不输出
subject.onNext("Hell") // 不输出
subject.onNext("Hello") // 0.5秒后输出: Hello
throttle
节流,在指定时间间隔内只发出第一个值。
let subject = PublishSubject<String>()
subject
.throttle(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject.onNext("A") // 立即输出: A
subject.onNext("B") // 不输出(1秒内)
subject.onNext("C") // 不输出(1秒内)
// 1秒后
subject.onNext("D") // 输出: D
4.3 组合操作符
startWith
在序列开始前插入元素。
Observable.from([1, 2, 3])
.startWith(0)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 0, 1, 2, 3
merge
合并多个 Observable。
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
Observable.merge(subject1, subject2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext(1) // 输出: 1
subject2.onNext(2) // 输出: 2
subject1.onNext(3) // 输出: 3
combineLatest
组合多个 Observable 的最新值。
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<Int>()
Observable.combineLatest(subject1, subject2)
.subscribe(onNext: { value1, value2 in
print("\(value1): \(value2)")
})
.disposed(by: disposeBag)
subject1.onNext("A") // 无输出(等待 subject2)
subject2.onNext(1) // 输出: A: 1
subject1.onNext("B") // 输出: B: 1
subject2.onNext(2) // 输出: B: 2
zip
按顺序组合多个 Observable。
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<Int>()
Observable.zip(subject1, subject2)
.subscribe(onNext: { value1, value2 in
print("\(value1): \(value2)")
})
.disposed(by: disposeBag)
subject1.onNext("A") // 等待 subject2
subject1.onNext("B") // 等待 subject2
subject2.onNext(1) // 输出: A: 1
subject2.onNext(2) // 输出: B: 2
withLatestFrom
当源 Observable 发出元素时,使用另一个 Observable 的最新值。
let button = PublishSubject<Void>()
let textField = PublishSubject<String>()
button
.withLatestFrom(textField)
.subscribe(onNext: { text in
print("按钮点击,文本: \(text)")
})
.disposed(by: disposeBag)
textField.onNext("Hello") // 无输出
textField.onNext("World") // 无输出
button.onNext(()) // 输出: 按钮点击,文本: World
switchLatest
切换到最新的内部 Observable。
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
let source = PublishSubject<Observable<Int>>()
source
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
source.onNext(subject1)
subject1.onNext(1) // 输出: 1
subject1.onNext(2) // 输出: 2
source.onNext(subject2)
subject1.onNext(3) // 不输出(已切换)
subject2.onNext(4) // 输出: 4
4.4 错误处理操作符
catchError
捕获错误并返回备用 Observable。
enum MyError: Error {
case failure
}
let observable = Observable<String>.error(MyError.failure)
.catchError { error -> Observable<String> in
print("捕获错误: \(error)")
return Observable.just("备用值")
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 捕获错误: failure, 备用值
catchErrorJustReturn
用默认值替换错误。
let observable = Observable<String>.error(MyError.failure)
.catchErrorJustReturn("默认值")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: 默认值
retry
重试失败的 Observable。
var attempts = 0
let observable = Observable<String>.create { observer in
attempts += 1
if attempts < 3 {
observer.onError(MyError.failure)
} else {
observer.onNext("成功")
observer.onCompleted()
}
return Disposables.create()
}
.retry(2) // 最多重试 2 次
.subscribe(
onNext: { print($0) },
onError: { print("错误: \($0)") }
)
.disposed(by: disposeBag)
// 输出: 成功
retryWhen
根据条件重试。
let observable = Observable<String>.error(MyError.failure)
.retryWhen { errors in
errors.enumerated().flatMap { index, error -> Observable<Int> in
if index < 2 {
return Observable<Int>.timer(.seconds(index + 1), scheduler: MainScheduler.instance)
} else {
return Observable.error(error)
}
}
}
.subscribe(
onNext: { print($0) },
onError: { print("最终错误: \($0)") }
)
.disposed(by: disposeBag)
4.5 工具操作符
do
执行副作用操作。
Observable.from([1, 2, 3])
.do(onNext: { print("即将发出: \($0)") },
onError: { print("错误: \($0)") },
onCompleted: { print("完成") },
onSubscribe: { print("订阅") },
onDispose: { print("释放") })
.subscribe(onNext: { print("收到: \($0)") })
.disposed(by: disposeBag)
delay
延迟发出元素。
Observable.from([1, 2, 3])
.delay(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 1秒后依次输出: 1, 2, 3
delaySubscription
延迟订阅。
Observable.from([1, 2, 3])
.delaySubscription(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 1秒后开始输出: 1, 2, 3
materialize / dematerialize
将事件序列化/反序列化。
Observable.from([1, 2, 3])
.materialize()
.subscribe(onNext: { event in
print(event) // 输出: next(1), next(2), next(3), completed
})
.disposed(by: disposeBag)
timeout
超时处理。
Observable<Int>.never()
.timeout(.seconds(2), scheduler: MainScheduler.instance)
.subscribe(
onNext: { print($0) },
onError: { print("超时: \($0)") }
)
.disposed(by: disposeBag)
// 2秒后输出: 超时: RxError.timeout
5. Subjects
Subjects 既是 Observable 又是 Observer,可以手动发送事件。
5.1 PublishSubject
不保存当前值,只向订阅者发送订阅后的事件。
let subject = PublishSubject<String>()
// 订阅1
subject.subscribe(onNext: { print("订阅1: \($0)") })
.disposed(by: disposeBag)
subject.onNext("A") // 输出: 订阅1: A
// 订阅2
subject.subscribe(onNext: { print("订阅2: \($0)") })
.disposed(by: disposeBag)
subject.onNext("B") // 输出: 订阅1: B, 订阅2: B
subject.onCompleted()
5.2 BehaviorSubject
保存当前值,新订阅者会立即收到当前值。
let subject = BehaviorSubject<String>(value: "初始值")
// 订阅1
subject.subscribe(onNext: { print("订阅1: \($0)") })
.disposed(by: disposeBag)
// 输出: 订阅1: 初始值
subject.onNext("新值") // 输出: 订阅1: 新值
// 订阅2
subject.subscribe(onNext: { print("订阅2: \($0)") })
.disposed(by: disposeBag)
// 输出: 订阅2: 新值(立即收到当前值)
5.3 ReplaySubject
保存指定数量的最近值,新订阅者会收到这些值。
let subject = ReplaySubject<String>.create(bufferSize: 2)
subject.onNext("A")
subject.onNext("B")
subject.onNext("C")
// 订阅
subject.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出: B, C(最近2个值)
5.4 AsyncSubject
只发出最后一个值(在完成时)。
let subject = AsyncSubject<String>()
subject.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject.onNext("A") // 不输出
subject.onNext("B") // 不输出
subject.onNext("C") // 不输出
subject.onCompleted() // 输出: C
6. Schedulers调度器
Schedulers 决定操作在哪个线程执行。
6.1 内置Scheduler
MainScheduler
主线程调度器。
Observable.just(1)
.observeOn(MainScheduler.instance)
.subscribe(onNext: { value in
// 在主线程执行
print(Thread.isMainThread) // true
})
.disposed(by: disposeBag)
SerialDispatchQueueScheduler
串行队列调度器。
let scheduler = SerialDispatchQueueScheduler(
qos: .userInitiated,
internalSerialQueueName: "custom.queue"
)
Observable.just(1)
.observeOn(scheduler)
.subscribe(onNext: { value in
// 在后台线程执行
})
.disposed(by: disposeBag)
ConcurrentDispatchQueueScheduler
并发队列调度器。
let scheduler = ConcurrentDispatchQueueScheduler(
qos: .background
)
Observable.from([1, 2, 3])
.observeOn(scheduler)
.subscribe(onNext: { value in
// 在后台线程执行
})
.disposed(by: disposeBag)
OperationQueueScheduler
操作队列调度器。
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 2
let scheduler = OperationQueueScheduler(operationQueue: queue)
Observable.from([1, 2, 3, 4, 5])
.observeOn(scheduler)
.subscribe(onNext: { value in
// 在操作队列执行
})
.disposed(by: disposeBag)
6.2 subscribeOn vs observeOn
-
subscribeOn:指定订阅在哪个线程执行
-
observeOn:指定后续操作在哪个线程执行
Observable.create { observer in
print("订阅线程: \(Thread.current)")
observer.onNext(1)
observer.onCompleted()
return Disposables.create()
}
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { value in
print("接收线程: \(Thread.current)")
})
.disposed(by: disposeBag)
7. 错误处理
7.1 错误类型
enum RxError: Swift.Error {
case unknown
case disposed
case timeout
case noElements
case moreThanOneElement
}
enum NetworkError: Error {
case invalidURL
case noData
case decodingError
}
7.2 错误处理策略
func fetchData() -> Observable<String> {
return Observable.create { observer in
// 模拟网络请求
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
observer.onError(NetworkError.noData)
}
return Disposables.create()
}
}
fetchData()
.catchError { error -> Observable<String> in
// 捕获错误,返回备用 Observable
return Observable.just("默认数据")
}
.retry(3) // 重试 3 次
.subscribe(
onNext: { print($0) },
onError: { print("最终错误: \($0)") }
)
.disposed(by: disposeBag)
8. 内存管理
8.1 DisposeBag
自动管理订阅的生命周期。
class ViewController: UIViewController {
private let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
Observable.just("Hello")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag) // 自动管理
}
// viewController 释放时,disposeBag 会自动释放所有订阅
}
8.2 避免循环引用
class ViewModel {
private let disposeBag = DisposeBag()
func setup() {
Observable.just("Data")
.subscribe(onNext: { [weak self] value in
// 使用 weak self 避免循环引用
self?.process(value)
})
.disposed(by: disposeBag)
}
private func process(_ value: String) {
// 处理数据
}
}
8.3 takeUntil
在指定条件满足时自动取消订阅。
class ViewController: UIViewController {
override func viewDidLoad() {
super.viewDidLoad()
Observable.interval(.seconds(1), scheduler: MainScheduler.instance)
.takeUntil(self.rx.deallocated) // viewController 释放时自动取消
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
}
9. 与UIKit集成
9.1 RxCocoa基础
RxCocoa 提供了 UIKit 的 Rx 扩展。
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var textField: UITextField!
@IBOutlet weak var button: UIButton!
@IBOutlet weak var label: UILabel!
private let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
// 文本输入绑定
textField.rx.text
.bind(to: label.rx.text)
.disposed(by: disposeBag)
// 按钮点击
button.rx.tap
.subscribe(onNext: { [weak self] in
self?.handleButtonTap()
})
.disposed(by: disposeBag)
}
}
9.2 常用绑定
// UILabel
label.rx.text.onNext("Hello")
label.rx.attributedText.onNext(attributedString)
// UITextField
textField.rx.text
.subscribe(onNext: { text in
print("文本: \(text ?? "")")
})
.disposed(by: disposeBag)
// UIButton
button.rx.tap
.subscribe(onNext: {
print("按钮点击")
})
.disposed(by: disposeBag)
// UISwitch
switch.rx.isOn
.subscribe(onNext: { isOn in
print("开关: \(isOn)")
})
.disposed(by: disposeBag)
// UISlider
slider.rx.value
.subscribe(onNext: { value in
print("值: \(value)")
})
.disposed(by: disposeBag)
9.3 TableView绑定
import RxDataSources
class ViewController: UIViewController {
@IBOutlet weak var tableView: UITableView!
private let disposeBag = DisposeBag()
private let items = BehaviorSubject<[String]>(value: ["Item 1", "Item 2", "Item 3"])
override func viewDidLoad() {
super.viewDidLoad()
let dataSource = RxTableViewSectionedReloadDataSource<String> { dataSource, tableView, indexPath, item in
let cell = tableView.dequeueReusableCell(withIdentifier: "Cell", for: indexPath)
cell.textLabel?.text = item
return cell
}
items
.map { [SectionModel(model: "", items: $0)] }
.bind(to: tableView.rx.items(dataSource: dataSource))
.disposed(by: disposeBag)
}
}
10. 实际应用场景
10.1 网络请求
struct API {
static func fetchUser(id: Int) -> Observable<User> {
let url = URL(string: "https://api.example.com/users/\(id)")!
return URLSession.shared.rx.data(request: URLRequest(url: url))
.map { data in
try JSONDecoder().decode(User.self, from: data)
}
.observeOn(MainScheduler.instance)
}
}
API.fetchUser(id: 1)
.subscribe(
onNext: { user in
print("用户: \(user)")
},
onError: { error in
print("错误: \(error)")
}
)
.disposed(by: disposeBag)
10.2 用户输入处理
class SearchViewModel {
private let disposeBag = DisposeBag()
let searchText = BehaviorSubject<String>(value: "")
let results = BehaviorSubject<[String]>(value: [])
init() {
searchText
.debounce(.milliseconds(500), scheduler: MainScheduler.instance)
.distinctUntilChanged()
.filter { !$0.isEmpty }
.flatMapLatest { query -> Observable<[String]> in
return self.search(query: query)
.catchErrorJustReturn([])
}
.bind(to: results)
.disposed(by: disposeBag)
}
private func search(query: String) -> Observable<[String]> {
// 实现搜索逻辑
return Observable.just(["结果1", "结果2"])
}
}
10.3 组合多个数据源
class DashboardViewModel {
private let disposeBag = DisposeBag()
let user = BehaviorSubject<User?>(value: nil)
let posts = BehaviorSubject<[Post]>(value: [])
let isLoading = BehaviorSubject<Bool>(value: false)
func loadData() {
isLoading.onNext(true)
let userObservable = API.fetchUser(id: 1)
let postsObservable = API.fetchPosts()
Observable.zip(userObservable, postsObservable)
.observeOn(MainScheduler.instance)
.subscribe(
onNext: { [weak self] user, posts in
self?.user.onNext(user)
self?.posts.onNext(posts)
self?.isLoading.onNext(false)
},
onError: { [weak self] error in
self?.isLoading.onNext(false)
print("错误: \(error)")
}
)
.disposed(by: disposeBag)
}
}
10.4 表单验证(多字段实时校验)
多字段表单:用户名、密码、确认密码实时校验,用 combineLatest 聚合多流,用 map 产出错误文案或是否可提交。
class FormViewModel {
private let disposeBag = DisposeBag()
let username = BehaviorRelay<String>(value: "")
let password = BehaviorRelay<String>(value: "")
let confirmPassword = BehaviorRelay<String>(value: "")
let usernameError = BehaviorRelay<String?>(value: nil)
let isFormValid = BehaviorRelay<Bool>(value: false)
init() {
// 用户名:非空 + 长度
username
.map { name in
if name.isEmpty { return "请输入用户名" }
if name.count < 3 { return "至少 3 个字符" }
return nil
}
.bind(to: usernameError)
.disposed(by: disposeBag)
// 三字段 combineLatest,任一变化都重新计算表单是否有效
Observable.combineLatest(username, password, confirmPassword)
.map { name, pwd, confirm in
if name.isEmpty || pwd.isEmpty { return false }
if pwd != confirm { return false }
if pwd.count < 6 { return false }
return true
}
.bind(to: isFormValid)
.disposed(by: disposeBag)
}
}
// VC 中绑定
viewModel.isFormValid
.bind(to: submitButton.rx.isEnabled)
.disposed(by: disposeBag)
viewModel.usernameError
.bind(to: usernameErrorLabel.rx.text)
.disposed(by: disposeBag)
10.5 NotificationCenter 转 Observable
系统通知或自定义通知转为 Observable,便于在链中 map、filter、observeOn。
// 键盘即将显示:取键盘 frame
let keyboardWillShow = NotificationCenter.default.rx
.notification(UIResponder.keyboardWillShowNotification)
.map { notification -> CGRect in
(notification.userInfo?[UIResponder.keyboardFrameEndUserInfoKey] as? NSValue)?.cgRectValue ?? .zero
}
.observeOn(MainScheduler.instance)
keyboardWillShow
.subscribe(onNext: { frame in
print("键盘高度: \(frame.height)")
})
.disposed(by: disposeBag)
// 自定义通知
extension Notification.Name {
static let myCustomEvent = Notification.Name("MyCustomEvent")
}
let customObservable = NotificationCenter.default.rx.notification(.myCustomEvent)
10.6 Timer 与周期任务
用 Observable.interval 做定时轮询,或用 Observable.timer 做延迟/单次任务。
// 每 1 秒发一个递增整数,主线程接收
let timerObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(10) // 只取 10 次
.subscribe(onNext: { tick in
print("tick: \(tick)")
})
.disposed(by: disposeBag)
// 延迟 2 秒后执行一次
Observable<Int>.timer(.seconds(2), scheduler: MainScheduler.instance)
.subscribe(onNext: { _ in
print("2 秒后执行")
})
.disposed(by: disposeBag)
// 轮询接口:每 5 秒请求一次,直到满足条件
Observable<Int>.interval(.seconds(5), scheduler: MainScheduler.instance)
.flatMapLatest { _ in API.pollStatus() }
.takeWhile { !$0.isDone }
.subscribe(onNext: { status in
print("状态: \(status)")
})
.disposed(by: disposeBag)
10.7 请求重试与超时
retry 在失败时重新订阅上游;timeout 超时未完成则发 error;配合 catchError 做兜底。
URLSession.shared.rx.data(request: request)
.timeout(.seconds(10), scheduler: MainScheduler.instance)
.retry(3)
.map { data in try JSONDecoder().decode(User.self, from: data) }
.catchError { _ in Observable.just(User.placeholder) }
.observeOn(MainScheduler.instance)
.subscribe(
onNext: { user in
// 更新 UI
},
onError: { error in
print("错误: \(error)")
}
)
.disposed(by: disposeBag)
10.8 多源竞速(主备 / race)
主接口失败时切到备用接口,用 catchError 切流;或 merge + take(1) 实现「谁先完成用谁」。
// 主接口失败时用备用接口
func loadFromPrimaryOrFallback() -> Observable<Data> {
let primary = URLSession.shared.rx.data(request: primaryRequest)
let fallback = URLSession.shared.rx.data(request: fallbackRequest)
return primary.catchError { _ in fallback }
}
// 显式 race:两个请求谁先完成用谁
func race<Element>(_ a: Observable<Element>, _ b: Observable<Element>) -> Observable<Element> {
Observable.merge(a, b).take(1)
}
10.9 节流与防抖组合(搜索 + 按钮防重复点击)
搜索框用 debounce 减少请求频率;提交按钮用 throttle 防止连续点击重复提交。
// 搜索:防抖 + 去重 + 非空 + flatMapLatest 只保留最后一次请求
searchBar.rx.text.orEmpty
.debounce(.milliseconds(400), scheduler: MainScheduler.instance)
.distinctUntilChanged()
.filter { !$0.isEmpty }
.flatMapLatest { query in
API.search(query: query).catchErrorJustReturn([])
}
.observeOn(MainScheduler.instance)
.bind(to: results)
.disposed(by: disposeBag)
// 提交按钮:节流 1 秒内只响应一次
submitButton.rx.tap
.throttle(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { [weak self] in
self?.submit()
})
.disposed(by: disposeBag)
10.10 RxCocoa 进阶:UISearchBar、RefreshControl、DelegateProxy
UISearchBar:rx.text、rx.searchButtonClicked 组合做「点击搜索」或「实时搜索」。
// 点击搜索按钮时用当前文本请求
searchBar.rx.searchButtonClicked
.withLatestFrom(searchBar.rx.text.orEmpty)
.filter { !$0.isEmpty }
.flatMapLatest { API.search(query: $0) }
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self] results in
self?.updateResults(results)
})
.disposed(by: disposeBag)
UIRefreshControl:下拉刷新与 isRefreshing 绑定。
refreshControl.rx.controlEvent(.valueChanged)
.flatMapLatest { [weak self] _ in
self?.loadData() ?? Observable.never()
}
.observeOn(MainScheduler.instance)
.subscribe(
onNext: { [weak self] _ in
self?.refreshControl.endRefreshing()
},
onError: { [weak self] _ in
self?.refreshControl.endRefreshing()
}
)
.disposed(by: disposeBag)
DelegateProxy 示例(UITableView 点击):RxCocoa 已为常用控件提供 rx 扩展,如需自定义可继承 DelegateProxy。
// 使用 RxCocoa 内置的 itemSelected
tableView.rx.itemSelected
.subscribe(onNext: { indexPath in
print("选中: \(indexPath)")
})
.disposed(by: disposeBag)
tableView.rx.modelSelected(Item.self)
.subscribe(onNext: { item in
print("选中项: \(item)")
})
.disposed(by: disposeBag)
10.11 页面生命周期与 takeUntil
在 VC 中让订阅随页面消失而自动取消:用 rx.deallocating 或 takeUntil(self.rx.deallocated),避免重复订阅和泄漏。
class ViewController: UIViewController {
private let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
// 方式一:统一丢进 disposeBag,VC 释放时一起 dispose
someObservable
.subscribe(onNext: { })
.disposed(by: disposeBag)
// 方式二:显式「直到某事件发生就结束」(如直到页面即将消失)
someObservable
.takeUntil(rx.deallocated)
.subscribe(onNext: { })
.disposed(by: disposeBag)
}
}
10.12 CollectionView 与 RxDataSources
使用 RxDataSources 的 Section 模型驱动 UICollectionView,与 TableView 用法类似(Item 为业务模型类型,需与 Cell 一致)。
import RxDataSources
typealias Section = SectionModel<String, Item> // Item 为业务模型
let dataSource = RxCollectionViewSectionedReloadDataSource<Section> { dataSource, collectionView, indexPath, item in
let cell = collectionView.dequeueReusableCell(withReuseIdentifier: "Cell", for: indexPath) as! ItemCell
cell.configure(with: item)
return cell
}
items
.map { [Section(model: "列表", items: $0)] }
.bind(to: collectionView.rx.items(dataSource: dataSource))
.disposed(by: disposeBag)
10.13 双向绑定与 ControlProperty
RxCocoa 的 ControlProperty 支持双向绑定:一方是「用户输入」,一方是「模型/ViewModel」。
// 将 TextField 与 BehaviorRelay 双向绑定(需自己写绑定逻辑,或使用 RxCocoa 的 bind)
// 单向:ViewModel -> UI
viewModel.username
.bind(to: textField.rx.text)
.disposed(by: disposeBag)
// 单向:UI -> ViewModel
textField.rx.text.orEmpty
.bind(to: viewModel.username)
.disposed(by: disposeBag)
// 若需「初始值 + 用户修改都同步」,两行都写即可(Relay 与控件类型匹配时)
10.14 错误流与用户提示
将网络/业务错误统一转为「可展示的提示」,用 materialize() 或 catchError 转成另一种元素类型,再在 UI 层订阅。
API.fetchUser(id: 1)
.materialize()
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self] event in
switch event {
case .next(let user):
self?.showUser(user)
case .error(let error):
self?.showToast("加载失败: \(error.localizedDescription)")
case .completed:
break
}
})
.disposed(by: disposeBag)
二、RxSwift框架源码解析
1. 架构设计
1.1 整体架构
RxSwift 采用协议导向的设计,核心是三个协议:
ObservableType (可观察类型)
↓
ObserverType (观察者类型)
↓
Disposable (可释放资源)
数据流:
Observable → Observer
↑ ↓
└── 反馈 ──┘
1.2 核心协议层次
// 第一层:ObservableType 协议
protocol ObservableType {
associatedtype Element
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable
where Observer.Element == Element
}
// 第二层:ObserverType 协议
protocol ObserverType {
associatedtype Element
func on(_ event: Event<Element>)
}
// 第三层:Disposable 协议
protocol Disposable {
func dispose()
}
1.3 事件类型
enum Event<Element> {
case next(Element)
case error(Swift.Error)
case completed
}
2. Observable协议实现
2.1 ObservableType协议定义
public protocol ObservableType {
associatedtype Element
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable
where Observer.Element == Element
}
2.2 Observable实现
public class Observable<Element>: ObservableType {
public typealias Element = Element
internal init() {}
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable
where Observer.Element == Element {
rxAbstractMethod()
}
public func asObservable() -> Observable<Element> {
return self
}
}
关键点:
-
Observable 是抽象类
-
subscribe 方法需要子类实现
- 使用
rxAbstractMethod() 防止直接实例化
2.3 Just实现分析
final private class Just<Element>: Producer<Element> {
private let element: Element
init(element: Element) {
self.element = element
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
where Observer.Element == Element {
let sink = JustSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
final private class JustSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = Just<Element>
private let parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
switch event {
case .next:
forwardOn(.next(parent.element))
forwardOn(.completed)
self.dispose()
case .error, .completed:
forwardOn(event)
self.dispose()
}
}
func run() -> Disposable {
forwardOn(.next(parent.element))
forwardOn(.completed)
return Disposables.create()
}
}
关键点:
-
Just 继承自 Producer
- 使用
JustSink 处理订阅逻辑
- 立即发出元素并完成
2.4 Create实现分析
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
private let subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self.subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>
private let parent: Parent
init(observer: Observer, cancel: Cancelable) {
self.parent = AnonymousObservable(subscribeHandler: { observer in
// 包装观察者
return Disposables.create()
})
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
switch event {
case .next:
forwardOn(event)
case .error, .completed:
forwardOn(event)
self.dispose()
}
}
func run(_ parent: Parent) -> Disposable {
return parent.subscribeHandler(AnyObserver(self))
}
}
关键点:
-
AnonymousObservable 使用闭包创建
-
AnyObserver 包装观察者
- 支持自定义订阅逻辑
3. Observer协议实现
3.1 ObserverType协议定义
public protocol ObserverType {
associatedtype Element
func on(_ event: Event<Element>)
}
3.2 AnyObserver实现
public struct AnyObserver<Element>: ObserverType {
public typealias Element = Element
private let observer: AnyObserverBase<Element>
public init<Observer: ObserverType>(_ observer: Observer)
where Observer.Element == Element {
self.observer = ObserverBox(observer)
}
public func on(_ event: Event<Element>) {
observer.on(event)
}
}
private class AnyObserverBase<Element>: ObserverType {
func on(_ event: Event<Element>) {
rxAbstractMethod()
}
}
private final class ObserverBox<Observer: ObserverType>: AnyObserverBase<Observer.Element> {
private let observer: Observer
init(_ observer: Observer) {
self.observer = observer
}
override func on(_ event: Event<Observer.Element>) {
observer.on(event)
}
}
关键点:
-
AnyObserver 是类型擦除包装器
- 使用
ObserverBox 存储具体观察者
- 实现观察者的多态
3.3 Sink实现
class Sink<Observer: ObserverType>: Disposable {
typealias Element = Observer.Element
private let observer: Observer
private let cancel: Cancelable
private var disposed = false
init(observer: Observer, cancel: Cancelable) {
self.observer = observer
self.cancel = cancel
}
final func forwardOn(_ event: Event<Element>) {
if isDisposed {
return
}
observer.on(event)
}
final func forwardOn(_ event: Event<Element>, _ disposeHandler: @escaping () -> Void) {
if isDisposed {
return
}
observer.on(event)
disposeHandler()
}
func dispose() {
if !disposed {
disposed = true
cancel.dispose()
}
}
var isDisposed: Bool {
return disposed
}
}
关键点:
-
Sink 是观察者的基类
- 提供
forwardOn 方法转发事件
- 管理订阅的生命周期
4. Operators实现原理
4.1 Map操作符实现
extension ObservableType {
public func map<Result>(_ transform: @escaping (Element) -> Result) -> Observable<Result> {
return Map(source: self.asObservable(), transform: transform)
}
}
final private class Map<SourceType, ResultType>: Producer<ResultType> {
typealias Transform = (SourceType) -> ResultType
private let source: Observable<SourceType>
private let transform: Transform
init(source: Observable<SourceType>, transform: @escaping Transform) {
self.source = source
self.transform = transform
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
where Observer.Element == ResultType {
let sink = MapSink(transform: transform, observer: observer, cancel: cancel)
let subscription = source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias ResultType = Observer.Element
typealias Transform = (SourceType) -> ResultType
private let transform: Transform
init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
self.transform = transform
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
let mappedElement = try transform(element)
forwardOn(.next(mappedElement))
} catch {
forwardOn(.error(error))
dispose()
}
case .error(let error):
forwardOn(.error(error))
dispose()
case .completed:
forwardOn(.completed)
dispose()
}
}
}
关键点:
-
Map 是新的 Observable,包装源 Observable
- 创建
MapSink 进行转换
- 错误处理:转换失败时发出错误
4.2 Filter操作符实现
extension ObservableType {
public func filter(_ predicate: @escaping (Element) -> Bool) -> Observable<Element> {
return Filter(source: self.asObservable(), predicate: predicate)
}
}
final private class Filter<Element>: Producer<Element> {
typealias Predicate = (Element) -> Bool
private let source: Observable<Element>
private let predicate: Predicate
init(source: Observable<Element>, predicate: @escaping Predicate) {
self.source = source
self.predicate = predicate
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
where Observer.Element == Element {
let sink = FilterSink(predicate: predicate, observer: observer, cancel: cancel)
let subscription = source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
final private class FilterSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Predicate = (Element) -> Bool
private let predicate: Predicate
init(predicate: @escaping Predicate, observer: Observer, cancel: Cancelable) {
self.predicate = predicate
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
switch event {
case .next(let element):
do {
let satisfies = try predicate(element)
if satisfies {
forwardOn(.next(element))
}
} catch {
forwardOn(.error(error))
dispose()
}
case .error, .completed:
forwardOn(event)
dispose()
}
}
}
关键点:
4.3 FlatMap操作符实现
extension ObservableType {
public func flatMap<Source: ObservableConvertibleType>(
_ selector: @escaping (Element) -> Source
) -> Observable<Source.Element> {
return FlatMap(source: self.asObservable(), selector: selector)
}
}
final private class FlatMap<SourceElement, SourceSequence: ObservableConvertibleType>: Producer<SourceSequence.Element> {
typealias Selector = (SourceElement) -> SourceSequence
private let source: Observable<SourceElement>
private let selector: Selector
init(source: Observable<SourceElement>, selector: @escaping Selector) {
self.source = source
self.selector = selector
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
where Observer.Element == SourceSequence.Element {
let sink = FlatMapSink(selector: selector, observer: observer, cancel: cancel)
let subscription = sink.run(source)
return (sink: sink, subscription: subscription)
}
}
final private class FlatMapSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType>: MergeSink<SourceSequence, Observer>
where Observer.Element == SourceSequence.Element {
typealias Selector = (SourceElement) -> SourceSequence
private let selector: Selector
init(selector: @escaping Selector, observer: Observer, cancel: Cancelable) {
self.selector = selector
super.init(observer: observer, cancel: cancel)
}
override func on(_ event: Event<SourceElement>) {
switch event {
case .next(let element):
do {
let innerObservable = try selector(element).asObservable()
subscribeInner(innerObservable, group: group)
} catch {
forwardOn(.error(error))
dispose()
}
case .error(let error):
forwardOn(.error(error))
dispose()
case .completed:
groupCompleted()
}
}
}
关键点:
- 管理多个内部 Observable 订阅
- 使用
MergeSink 合并结果
- 需要复杂的生命周期管理
5. Subjects实现原理
5.1 PublishSubject实现
public final class PublishSubject<Element>: Observable<Element>, SubjectType, Cancelable, ObserverType, SynchronizedUnsubscribeType {
public typealias SubjectObserverType = PublishSubject<Element>
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
private let lock = RecursiveLock()
private var observers: Observers = Observers()
private var isDisposed = false
private var stopped = false
private var stoppedEvent: Event<Element>?
public override init() {
super.init()
}
public func on(_ event: Event<Element>) {
dispatch(synchronized_on(event), event)
}
func synchronized_on(_ event: Event<Element>) -> Observers {
lock.lock()
defer { lock.unlock() }
switch event {
case .next:
if isDisposed || stopped {
return Observers()
}
return observers
case .completed, .error:
if stoppedEvent == nil {
stoppedEvent = event
stopped = true
let observers = self.observers
self.observers.removeAll()
return observers
}
return Observers()
}
}
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable
where Observer.Element == Element {
lock.lock()
defer { lock.unlock() }
if let stoppedEvent = stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if isDisposed {
observer.on(.error(RxError.disposed))
return Disposables.create()
}
let key = observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
lock.lock()
defer { lock.unlock() }
observers.removeKey(disposeKey)
}
}
关键点:
- 使用锁保护
observers 集合
- 不保存当前值,新订阅者不会收到历史值
- 使用
SubscriptionDisposable 管理订阅
5.2 BehaviorSubject实现
public final class BehaviorSubject<Element>: Observable<Element>, SubjectType, ObserverType, SynchronizedUnsubscribeType {
public typealias SubjectObserverType = BehaviorSubject<Element>
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
private let lock = RecursiveLock()
private var observers: Observers = Observers()
private var isDisposed = false
private var stoppedEvent: Event<Element>?
private var element: Element
public init(value: Element) {
self.element = value
super.init()
}
public var value: Element {
lock.lock()
defer { lock.unlock() }
return element
}
public func on(_ event: Event<Element>) {
dispatch(synchronized_on(event), event)
}
func synchronized_on(_ event: Event<Element>) -> Observers {
lock.lock()
defer { lock.unlock() }
if stoppedEvent != nil || isDisposed {
return Observers()
}
switch event {
case .next(let element):
self.element = element
case .error, .completed:
stoppedEvent = event
}
return observers
}
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable
where Observer.Element == Element {
lock.lock()
defer { lock.unlock() }
if let stoppedEvent = stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if isDisposed {
observer.on(.error(RxError.disposed))
return Disposables.create()
}
let key = observers.insert(observer.on)
observer.on(.next(element)) // 立即发送当前值
return SubscriptionDisposable(owner: self, key: key)
}
}
关键点:
- 保存当前值
element
- 新订阅者立即收到当前值
- 使用锁保护状态
6. Schedulers实现原理
6.1 SchedulerType协议
public protocol SchedulerType {
var now: RxTime { get }
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable
func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable
func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable
}
6.2 MainScheduler实现
public final class MainScheduler: SerialDispatchQueueScheduler {
private let mainQueue: DispatchQueue
public static let instance = MainScheduler()
public static let asyncInstance = SerialDispatchQueueScheduler(
serialQueue: DispatchQueue.main
)
private init() {
mainQueue = DispatchQueue.main
super.init(serialQueue: mainQueue)
}
public static func ensureExecutingOnScheduler(errorMessage: String? = nil) {
if !DispatchQueue.isMain {
rxFatalError(errorMessage ?? "Executing on background thread. Please use `MainScheduler.instance.schedule` to schedule work on main thread.")
}
}
}
关键点:
- 使用
DispatchQueue.main
- 提供单例实例
- 提供线程检查方法
6.3 SerialDispatchQueueScheduler实现
public class SerialDispatchQueueScheduler: SchedulerType {
public typealias TimeInterval = Foundation.TimeInterval
public typealias Time = Date
private let configuration: DispatchQueueConfiguration
private let serialQueue: DispatchQueue
public var now: RxTime {
return Date()
}
public init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.serialQueue = serialQueue
self.configuration = DispatchQueueConfiguration(
queue: serialQueue,
leeway: leeway
)
}
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
serialQueue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
return scheduleRelativeInternal(state, dueTime: dueTime, action: action)
}
func scheduleRelativeInternal<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
let deadline = now.addingTimeInterval(dueTime)
let cancel = SingleAssignmentDisposable()
serialQueue.asyncAfter(deadline: deadline) {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
}
关键点:
- 使用
DispatchQueue 执行任务
- 支持立即和延迟调度
- 使用
SingleAssignmentDisposable 管理取消
7. 背压处理机制
7.1 背压问题
当生产者产生数据的速度快于消费者处理数据的速度时,会产生背压问题。
7.2 背压处理策略
RxSwift 主要通过以下方式处理背压:
-
请求机制:Observer 可以控制请求的数据量
-
缓冲:使用
buffer 操作符缓冲数据
-
节流:使用
throttle、debounce 控制数据流速度
-
采样:使用
sample 采样数据
7.3 背压处理示例
class BackpressureObserver: ObserverType {
typealias Element = Int
private var buffer: [Int] = []
private let bufferSize: Int
private var subscription: Subscription?
init(bufferSize: Int = 10) {
self.bufferSize = bufferSize
}
func on(_ event: Event<Int>) {
switch event {
case .next(let element):
buffer.append(element)
// 处理缓冲区
processBuffer()
// 如果缓冲区未满,可以继续接收
if buffer.count < bufferSize {
// 继续接收
}
case .error, .completed:
// 处理完成
processRemaining()
}
}
private func processBuffer() {
while !buffer.isEmpty {
let value = buffer.removeFirst()
print("处理: \(value)")
}
}
private func processRemaining() {
processBuffer()
}
}
8. 性能优化策略
8.1 值类型优化
RxSwift 大量使用值类型(struct),避免堆分配:
// 值类型,零成本抽象
struct Just<Element>: ObservableType { }
struct Map<SourceType, ResultType>: ObservableType { }
struct Filter<Element>: ObservableType { }
8.2 类型擦除
使用 asObservable() 隐藏具体类型:
extension ObservableType {
public func asObservable() -> Observable<Element> {
return Observable.create { observer in
return self.subscribe(observer)
}
}
}
8.3 延迟执行
使用 deferred 延迟创建 Observable:
let deferred = Observable.deferred {
// 只在订阅时执行
return expensiveOperation()
}
8.4 共享订阅
使用 share() 共享 Observable:
let shared = expensiveObservable()
.share() // 多个订阅者共享同一个 Observable
shared.subscribe(onNext: { }) // 订阅1
shared.subscribe(onNext: { }) // 订阅2(共享执行)
8.5 内存优化
- 使用
DisposeBag 自动管理订阅
- 使用
weak self 避免循环引用
- 及时取消不需要的订阅
📚 总结
RxSwift 框架的核心优势
-
跨平台标准:基于 ReactiveX 标准,与其他平台一致
-
丰富的操作符:提供大量操作符处理各种场景
-
类型安全:充分利用 Swift 类型系统
-
性能优化:值类型、零成本抽象
-
生态丰富:RxCocoa、RxDataSources 等扩展
学习建议
-
从基础开始:理解 Observable、Observer、Disposable
-
实践操作符:熟悉常用操作符的使用
-
理解调度器:掌握
subscribeOn 和 observeOn
-
阅读源码:深入理解实现原理
-
实际应用:在项目中应用 RxSwift
RxSwift vs Combine
-
RxSwift:适合需要支持 iOS 8+ 的项目,API 更丰富
-
Combine:适合 iOS 13+ 项目,与系统深度集成
文档版本:v1.0
最后更新:2026年1月15日
参考文献:RxSwift GitHub Repository, ReactiveX Documentation