Dart 并发编程详细总结1
结合官网API文档和实践经验的全面指南,深入探讨Dart异步编程的各个方面
前言
在现代应用开发中,并发编程已成为不可或缺的技能。无论是处理网络请求、文件I/O操作,还是维持应用的响应性,异步编程都扮演着关键角色。Dart语言提供了一套独特且强大的并发编程模型,通过事件循环、Future、Stream和Isolate等核心概念,让开发者能够编写高效、可维护的异步代码。
本文档将从基础概念出发,逐步深入到高级应用,涵盖了Dart并发编程的方方面面。无论你是Dart新手还是有经验的开发者,都能从中找到有价值的内容。
目录
1. 并发基础概念
1.1 Dart 并发模型深度解析
Dart 采用了事件循环(Event Loop) + **隔离区(Isolate)**的创新并发模型,这种设计有别于传统的多线程共享内存模型,具有以下核心特征:
🔄 单线程事件循环
每个隔离区内部运行在单一线程上,通过事件循环(Event Loop)机制处理所有异步操作。这种设计的优势包括:
-
无需担心线程同步问题:单线程环境下自然避免了数据竞争
-
简化编程模型:开发者无需考虑复杂的锁机制和临界区
-
高效的上下文切换:事件循环比线程切换开销更小
🏝️ 隔离区完全隔离
不同隔离区之间采用完全内存隔离的设计:
-
零共享内存:每个隔离区拥有独立的堆内存空间
-
消息传递通信:隔离区间仅能通过消息传递进行通信
-
并行执行能力:可以在多核CPU上真正并行运行
🚫 自然避免传统并发问题
由于内存隔离的特性,Dart自然避免了传统多线程编程中的常见问题:
-
死锁(Deadlock):无共享资源,不存在死锁
-
数据竞争(Race Condition):隔离区内单线程执行
-
内存一致性问题:每个隔离区独立管理内存
// 传统多线程模型(伪代码)- 需要锁保护
class TraditionalCounter {
int _count = 0;
final Lock _lock = Lock();
void increment() {
_lock.acquire(); // 获取锁
_count++; // 临界区操作
_lock.release(); // 释放锁
}
}
// Dart模型 - 无需锁保护
class DartCounter {
int _count = 0;
void increment() {
_count++; // 单线程内安全操作,无需加锁
}
}
1.2 事件循环机制深入理解
事件循环是Dart并发编程的核心,它负责协调和执行所有异步操作。理解事件循环的工作原理对于编写高效的Dart代码至关重要。
📋 事件循环的队列结构
Dart的事件循环维护两个主要队列,它们有着不同的优先级:
┌─────────────────┐
│ 微任务队列 │ ← 🔥 最高优先级(Future.microtask、then回调)
│ (Microtask) │ 立即执行,可能阻塞事件循环
├─────────────────┤
│ 事件队列 │ ← ⚡ 正常优先级(I/O、Timer、用户交互)
│ (Event Queue) │ 依次执行,保证响应性
└─────────────────┘
⚙️ 详细执行机制
事件循环遵循严格的执行顺序,这个顺序决定了异步代码的执行时机:
-
微任务队列清空:执行所有微任务,直到队列为空
-
事件队列处理:从事件队列取出一个任务执行
-
循环重复:重复步骤1和2,直到两个队列都为空
void demonstrateEventLoop() {
print('🚀 开始执行同步代码');
// 添加到事件队列(低优先级)
Future(() => print('📦 事件队列任务1 - 普通Future'));
// 添加到微任务队列(高优先级)
Future.microtask(() => print('⚡ 微任务1 - microtask'));
// 再次添加到事件队列
Future(() => print('📦 事件队列任务2 - 普通Future'));
// 再次添加到微任务队列
Future.microtask(() => print('⚡ 微任务2 - microtask'));
// Timer也是事件队列任务
Timer(Duration.zero, () => print('⏰ Timer任务 - 事件队列'));
print('✅ 同步代码执行完毕');
}
// 执行结果(严格按顺序):
// 🚀 开始执行同步代码
// ✅ 同步代码执行完毕
// ⚡ 微任务1 - microtask
// ⚡ 微任务2 - microtask
// 📦 事件队列任务1 - 普通Future
// 📦 事件队列任务2 - 普通Future
// ⏰ Timer任务 - 事件队列
🔍 深度案例:复杂执行顺序分析
让我们通过一个更复杂的例子来理解事件循环的细节:
void complexEventLoopDemo() {
print('1: 开始');
// 创建一个已完成的Future,它的then会成为微任务
Future.value('immediate')
.then((value) {
print('3: $value - then回调(微任务)');
// 在微任务中再添加微任务
Future.microtask(() => print('4: 嵌套微任务'));
});
// 直接创建微任务
Future.microtask(() {
print('5: 直接微任务');
// 在微任务中添加事件队列任务
Future(() => print('8: 微任务中的Future'));
});
// 创建延迟Future(事件队列)
Future(() {
print('7: Future任务');
// 在Future中添加微任务
Future.microtask(() => print('9: Future中的微任务'));
});
// 再次创建微任务
Future.microtask(() => print('6: 最后的微任务'));
print('2: 结束同步代码');
}
/* 详细执行分析:
1: 开始 - 同步执行
2: 结束同步代码 - 同步执行完毕,开始处理异步队列
--- 第一轮:清空微任务队列 ---
3: immediate - then回调(微任务)
4: 嵌套微任务 - 微任务中添加的微任务立即执行
5: 直接微任务 - 微任务顺序执行
6: 最后的微任务 - 所有微任务执行完毕
--- 第二轮:处理事件队列任务 ---
7: Future任务 - 处理第一个事件队列任务
9: Future中的微任务 - 事件任务中的微任务立即执行
--- 第三轮:继续处理事件队列 ---
8: 微任务中的Future - 处理剩余的事件队列任务
*/
⚠️ 微任务使用注意事项
微任务虽然优先级高,但使用时需要特别谨慎:
void microtaskCaution() {
// ❌ 危险:无限递归的微任务会阻塞事件循环
void badRecursiveMicrotask(int count) {
if (count > 0) {
Future.microtask(() => badRecursiveMicrotask(count - 1));
}
}
// ✅ 安全:使用Future而非microtask避免阻塞
void goodRecursiveFuture(int count) {
if (count > 0) {
Future(() => goodRecursiveFuture(count - 1));
}
}
// 示例:大量微任务可能导致UI卡顿
for (int i = 0; i < 1000; i++) {
// ❌ 这会创建1000个微任务,可能阻塞UI
Future.microtask(() => heavyComputation(i));
}
// ✅ 更好的方式:分批处理或使用isolate
batchProcessMicrotasks();
}
void heavyComputation(int value) {
// 模拟耗时计算
for (int i = 0; i < 100000; i++) {
value = value * 2 + 1;
}
}
// 分批处理微任务的示例
void batchProcessMicrotasks() {
const int batchSize = 10;
const int totalTasks = 1000;
void processBatch(int startIndex) {
// 处理一批任务
for (int i = 0; i < batchSize && startIndex + i < totalTasks; i++) {
heavyComputation(startIndex + i);
}
// 如果还有任务,使用Future安排下一批
if (startIndex + batchSize < totalTasks) {
Future(() => processBatch(startIndex + batchSize));
}
}
// 开始处理第一批
processBatch(0);
}
📊 性能对比总结
特性 |
传统多线程 + 锁 |
Dart Isolate |
内存安全 |
❌ 需要仔细管理锁 |
✅ 内存隔离,天然安全 |
死锁风险 |
⚠️ 存在死锁风险 |
✅ 无死锁可能 |
性能开销 |
🔄 锁争用和上下文切换 |
⚡ 消息传递,无锁开销 |
编程复杂度 |
🔴 高,需要仔细设计 |
🟢 低,线性思维 |
调试难度 |
🔴 竞态条件难以重现 |
🟢 确定性执行 |
可扩展性 |
⚠️ 锁争用限制扩展性 |
✅ 天然支持多核扩展 |
通过这个深入的对比,我们可以清楚地看到 Dart 并发模型的优势。它通过事件循环和隔离区的设计,优雅地避开了传统多线程编程的所有陷阱,让开发者可以专注于业务逻辑而不是底层的同步细节。
2. Future 和 async/await 深度指南
Future 是 Dart 异步编程的基础,代表一个可能在未来某个时刻完成的计算。理解 Future 的各种用法和最佳实践,是掌握 Dart 异步编程的关键。
2.1 Future 核心概念与状态管理
🎯 什么是 Future?
Future 代表一个异步操作的结果,它有三种状态:
-
Uncompleted(未完成):操作仍在进行中
-
Completed with a value(成功完成):操作成功,返回期望的值
-
Completed with an error(错误完成):操作失败,抛出异常
// Future的生命周期示例
class FutureLifecycleDemo {
// 演示Future从创建到完成的整个过程
Future<void> demonstrateLifecycle() async {
print('🚀 创建Future - 此时状态为Uncompleted');
// 创建一个需要时间完成的Future
final future = Future.delayed(Duration(seconds: 2), () {
// 这里可能成功返回值,也可能抛出异常
if (DateTime.now().millisecondsSinceEpoch % 2 == 0) {
return '✅ 成功结果';
} else {
throw '❌ 模拟错误';
}
});
print('⏳ Future已创建,等待完成...');
try {
// 等待Future完成
final result = await future;
print('🎉 Future成功完成: $result');
} catch (error) {
print('💥 Future错误完成: $error');
}
}
}
📝 基本创建方式详解
class FutureCreationMethods {
// 方法1:立即完成的Future
void immediateCompletion() {
// 创建一个立即成功的Future
final successFuture = Future.value(42);
print('立即成功Future创建: $successFuture');
// 创建一个立即失败的Future
final errorFuture = Future.error('Something went wrong');
print('立即失败Future创建: $errorFuture');
// 实际使用示例:缓存场景
Future<String> getCachedData(String key) {
final cachedValue = cache[key];
if (cachedValue != null) {
// 缓存命中,立即返回
return Future.value(cachedValue);
} else {
// 缓存未命中,需要异步获取
return fetchFromNetwork(key);
}
}
}
// 方法2:延迟执行的Future
void delayedExecution() {
// 简单延迟
final delayed = Future.delayed(
Duration(seconds: 2),
() => 'Completed after 2 seconds'
);
// 带计算的延迟
final delayedComputation = Future.delayed(
Duration(milliseconds: 500),
() {
// 模拟一些计算工作
final result = List.generate(1000, (i) => i * i)
.reduce((a, b) => a + b);
return 'Computation result: $result';
}
);
// 实际应用:模拟网络请求延迟
Future<Map<String, dynamic>> mockApiCall() {
return Future.delayed(Duration(seconds: 1), () => {
'status': 'success',
'data': {'userId': 123, 'userName': 'John Doe'},
'timestamp': DateTime.now().toIso8601String(),
});
}
}
// 方法3:异步执行的Future
void asyncExecution() {
// 基本异步执行
final asyncFuture = Future(() async {
// 模拟异步操作序列
await Future.delayed(Duration(milliseconds: 500));
final step1 = await performStep1();
await Future.delayed(Duration(milliseconds: 300));
final step2 = await performStep2(step1);
return 'Final result: $step2';
});
// 错误处理的异步执行
final robustAsyncFuture = Future(() async {
try {
final data = await riskyOperation();
return processData(data);
} catch (e) {
// 在异步块中处理错误
print('处理操作中的错误: $e');
return 'fallback_value';
}
});
}
// 辅助方法
final Map<String, String> cache = {};
Future<String> fetchFromNetwork(String key) async {
await Future.delayed(Duration(seconds: 1));
return 'network_data_for_$key';
}
Future<String> performStep1() async {
await Future.delayed(Duration(milliseconds: 100));
return 'step1_result';
}
Future<String> performStep2(String input) async {
await Future.delayed(Duration(milliseconds: 200));
return '${input}_step2_result';
}
Future<String> riskyOperation() async {
if (DateTime.now().millisecondsSinceEpoch % 3 == 0) {
throw Exception('Random failure');
}
return 'success_data';
}
String processData(String data) {
return 'processed_$data';
}
}
⚙️ 核心方法详解
class FutureMethodsDetailed {
// then() - 链式调用的核心
Future<void> thenChainExample() {
print('🔗 演示then()链式调用');
Future.value('Hello World')
.then((value) {
print('第一个then: $value');
return value.toUpperCase(); // 返回值传递给下一个then
})
.then((upperValue) {
print('第二个then: $upperValue');
return upperValue.split(' '); // 转换为List<String>
})
.then((words) {
print('第三个then: $words');
return words.length; // 返回单词数量
})
.then((count) {
print('最终结果: $count 个单词');
})
.catchError((error) {
print('捕获错误: $error');
});
}
// catchError() - 错误处理的多种方式
Future<void> errorHandlingExamples() async {
print('🚨 演示错误处理方式');
// 方式1:链式错误处理
Future.error('模拟错误')
.then((value) => print('这里不会执行'))
.catchError((error) {
print('链式错误处理: $error');
return 'error_handled'; // 错误恢复
})
.then((value) => print('错误恢复后继续: $value'));
// 方式2:特定类型错误处理
Future(() {
throw FormatException('格式错误');
})
.catchError(
(error) => print('处理格式错误: $error'),
test: (error) => error is FormatException, // 只处理特定类型
)
.catchError(
(error) => print('处理其他错误: $error'),
);
// 方式3:try-catch with async/await
try {
final result = await riskyAsyncOperation();
print('成功获取结果: $result');
} on FormatException catch (e) {
print('格式异常: $e');
} on TimeoutException catch (e) {
print('超时异常: $e');
} catch (e, stackTrace) {
print('未知错误: $e');
print('堆栈追踪: $stackTrace');
}
}
// whenComplete() - 清理资源的最佳实践
Future<void> cleanupExample() async {
print('🧹 演示资源清理');
FileResource? resource;
try {
resource = await openFile('important_data.txt');
final data = await processFile(resource);
print('文件处理结果: $data');
} catch (error) {
print('文件处理错误: $error');
} finally {
// 传统的finally块
await resource?.close();
print('资源已清理(finally块)');
}
// 使用whenComplete的优雅方式
await openFile('another_file.txt')
.then((fileResource) async {
final result = await processFile(fileResource);
return result;
})
.whenComplete(() async {
print('whenComplete: 无论成功失败都会执行清理');
// 这里执行清理工作
await cleanup();
})
.catchError((error) {
print('最终错误处理: $error');
});
}
// timeout() - 超时处理的高级用法
Future<void> advancedTimeoutExample() async {
print('⏰ 演示超时处理');
// 基本超时
try {
final result = await slowOperation()
.timeout(Duration(seconds: 3));
print('操作成功: $result');
} on TimeoutException {
print('操作超时,采用默认值');
}
// 带自定义超时行为
final resultWithFallback = await slowOperation()
.timeout(
Duration(seconds: 2),
onTimeout: () {
print('检测到超时,返回缓存数据');
return getCachedResult();
},
);
print('最终结果(可能来自缓存): $resultWithFallback');
// 超时重试机制
final retryResult = await retryWithTimeout(
() => unreliableOperation(),
maxRetries: 3,
timeout: Duration(seconds: 5),
);
print('重试后的结果: $retryResult');
}
// 辅助方法
Future<String> riskyAsyncOperation() async {
await Future.delayed(Duration(milliseconds: 100));
// 随机抛出不同类型的异常
switch (DateTime.now().millisecondsSinceEpoch % 3) {
case 0:
throw FormatException('数据格式错误');
case 1:
throw TimeoutException('操作超时', Duration(seconds: 5));
default:
return '成功结果';
}
}
Future<FileResource> openFile(String filename) async {
await Future.delayed(Duration(milliseconds: 50));
return FileResource(filename);
}
Future<String> processFile(FileResource resource) async {
await Future.delayed(Duration(milliseconds: 100));
return '${resource.filename}的处理结果';
}
Future<void> cleanup() async {
await Future.delayed(Duration(milliseconds: 30));
print('清理工作完成');
}
Future<String> slowOperation() async {
await Future.delayed(Duration(seconds: 10)); // 故意很慢
return '慢操作的结果';
}
String getCachedResult() {
return '来自缓存的数据';
}
Future<String> unreliableOperation() async {
await Future.delayed(Duration(seconds: 2));
if (DateTime.now().millisecondsSinceEpoch % 2 == 0) {
throw Exception('不可靠的操作失败');
}
return '不可靠操作成功';
}
// 超时重试的通用方法
Future<T> retryWithTimeout<T>(
Future<T> Function() operation, {
required int maxRetries,
required Duration timeout,
}) async {
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation().timeout(timeout);
} catch (error) {
if (attempt == maxRetries) {
rethrow; // 最后一次尝试失败,重新抛出异常
}
print('尝试 $attempt 失败: $error,将重试...');
await Future.delayed(Duration(seconds: attempt)); // 递增延迟
}
}
throw Exception('不应该到达这里'); // 理论上不会执行
}
}
// 辅助类
class FileResource {
final String filename;
FileResource(this.filename);
Future<void> close() async {
await Future.delayed(Duration(milliseconds: 10));
print('文件 $filename 已关闭');
}
}
2.2 async/await 语法糖深度解析
async/await 是 Dart 提供的语法糖,让异步代码看起来像同步代码一样直观。它极大地简化了 Future 的使用,是现代 Dart 异步编程的首选方式。
🎭 async/await 的工作原理
// 这两种写法是等价的,但async/await更易读
// 传统Future链式写法
Future<String> traditionalWay() {
return fetchUserData()
.then((userData) => processUserData(userData))
.then((processedData) => saveToDatabase(processedData))
.then((result) => 'Processing completed: $result')
.catchError((error) => 'Error occurred: $error');
}
// async/await写法 - 更直观易懂
Future<String> modernWay() async {
try {
final userData = await fetchUserData();
final processedData = await processUserData(userData);
final result = await saveToDatabase(processedData);
return 'Processing completed: $result';
} catch (error) {
return 'Error occurred: $error';
}
}
📚 基本用法与最佳实践
class AsyncAwaitAdvanced {
// ✅ 标准的async函数模式
Future<User> fetchUser(int id) async {
print('🔍 开始获取用户数据: $id');
// 网络请求
final response = await httpClient.get('/api/users/$id');
if (response.statusCode != 200) {
throw HttpException('用户不存在: ${response.statusCode}');
}
// 解析JSON
final Map<String, dynamic> userData = json.decode(response.body);
// 数据验证
if (!userData.containsKey('id') || !userData.containsKey('name')) {
throw FormatException('用户数据格式错误');
}
print('✅ 用户数据获取成功');
return User.fromJson(userData);
}
// ✅ 多步异步操作的最佳实践
Future<UserProfile> buildUserProfile(int userId) async {
print('🏗️ 开始构建用户档案');
// 第一步:获取基本用户信息
final user = await fetchUser(userId);
print('📋 基本信息获取完成: ${user.name}');
// 第二步:获取用户偏好设置
final preferences = await fetchUserPreferences(userId);
print('⚙️ 偏好设置获取完成');
// 第三步:获取用户活动历史
final activityHistory = await fetchActivityHistory(userId);
print('📊 活动历史获取完成: ${activityHistory.length} 条记录');
// 第四步:组装完整档案
final profile = UserProfile(
user: user,
preferences: preferences,
activityHistory: activityHistory,
lastUpdated: DateTime.now(),
);
print('✨ 用户档案构建完成');
return profile;
}
// 🔄 错误处理与重试机制
Future<T> robustAsyncOperation<T>(
String operationName,
Future<T> Function() operation,
) async {
const maxRetries = 3;
const baseDelay = Duration(seconds: 1);
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
print('🎯 执行 $operationName (尝试 $attempt/$maxRetries)');
final result = await operation();
print('✅ $operationName 成功完成');
return result;
} catch (error) {
print('❌ $operationName 失败: $error');
if (attempt == maxRetries) {
print('💥 $operationName 达到最大重试次数,放弃操作');
rethrow;
}
// 指数退避策略
final delay = baseDelay * (attempt * attempt);
print('⏳ 等待 ${delay.inSeconds} 秒后重试...');
await Future.delayed(delay);
}
}
throw Exception('不应该到达这里');
}
// 🔧 资源管理的完整示例
Future<String> processFileWithProperCleanup(String filePath) async {
FileHandle? fileHandle;
DatabaseConnection? dbConnection;
NetworkSocket? networkSocket;
try {
print('🚀 开始处理文件: $filePath');
// 1. 打开文件
fileHandle = await FileHandle.open(filePath);
print('📂 文件打开成功');
// 2. 建立数据库连接
dbConnection = await DatabaseConnection.connect();
print('🗄️ 数据库连接建立');
// 3. 建立网络连接
networkSocket = await NetworkSocket.connect('api.example.com', 443);
print('🌐 网络连接建立');
// 4. 读取和处理文件内容
final content = await fileHandle.readAsString();
final processedContent = await processContent(content);
// 5. 保存到数据库
await dbConnection.save(processedContent);
// 6. 发送到远程服务器
await networkSocket.send(processedContent);
print('✅ 文件处理完成');
return '处理成功: ${processedContent.length} 字符';
} catch (error) {
print('💥 文件处理失败: $error');
rethrow;
} finally {
// 确保资源清理(按相反顺序)
print('🧹 开始清理资源...');
try {
await networkSocket?.close();
print('🌐 网络连接已关闭');
} catch (e) {
print('⚠️ 网络连接关闭失败: $e');
}
try {
await dbConnection?.close();
print('🗄️ 数据库连接已关闭');
} catch (e) {
print('⚠️ 数据库连接关闭失败: $e');
}
try {
await fileHandle?.close();
print('📂 文件已关闭');
} catch (e) {
print('⚠️ 文件关闭失败: $e');
}
print('✅ 资源清理完成');
}
}
}
🚀 并行与串行执行策略
class ConcurrencyPatterns {
// 🔄 串行执行 - 任务依赖关系
Future<OrderResult> processOrderSequentially(Order order) async {
print('📦 开始串行处理订单: ${order.id}');
// 步骤1:验证订单(必须先完成)
final validation = await validateOrder(order);
if (!validation.isValid) {
throw Exception('订单验证失败: ${validation.reason}');
}
// 步骤2:扣减库存(依赖验证结果)
final inventory = await deductInventory(order.items);
// 步骤3:处理支付(依赖库存扣减)
final payment = await processPayment(order.paymentInfo, order.totalAmount);
// 步骤4:创建发货单(依赖支付成功)
final shipping = await createShippingOrder(order, payment.transactionId);
// 步骤5:发送确认邮件(依赖前面所有步骤)
await sendConfirmationEmail(order.customerEmail, shipping.trackingNumber);
return OrderResult(
orderId: order.id,
paymentId: payment.transactionId,
shippingId: shipping.id,
estimatedDelivery: shipping.estimatedDelivery,
);
}
// ⚡ 并行执行 - 无依赖关系的任务
Future<UserDashboard> loadDashboardParallel(int userId) async {
print('⚡ 并行加载用户仪表板数据');
// 启动所有异步操作(不等待完成)
final userFuture = fetchUser(userId);
final notificationsFuture = fetchNotifications(userId);
final statisticsFuture = fetchUserStatistics(userId);
final recentActivityFuture = fetchRecentActivity(userId, limit: 10);
final settingsFuture = fetchUserSettings(userId);
print('🚀 所有请求已发起,等待完成...');
// 同时等待所有操作完成
final results = await Future.wait([
userFuture,
notificationsFuture,
statisticsFuture,
recentActivityFuture,
settingsFuture,
]);
print('✅ 所有数据加载完成');
return UserDashboard(
user: results[0] as User,
notifications: results[1] as List<Notification>,
statistics: results[2] as UserStatistics,
recentActivity: results[3] as List<Activity>,
settings: results[4] as UserSettings,
);
}
// 🎯 混合策略 - 部分并行,部分串行
Future<ProjectStatus> updateProjectStatus(int projectId) async {
print('🎯 使用混合策略更新项目状态');
// 阶段1:并行获取基础数据
final (project, team, settings) = await Future.wait([
fetchProject(projectId),
fetchProjectTeam(projectId),
fetchProjectSettings(projectId),
]).then((results) => (
results[0] as Project,
results[1] as Team,
results[2] as ProjectSettings,
));
print('📊 基础数据获取完成');
// 阶段2:基于基础数据,并行执行分析任务
final (tasks, milestones, reports) = await Future.wait([
analyzeTasks(project, team),
analyzeMilestones(project, settings),
generateReports(project, team, settings),
]).then((results) => (
results[0] as TaskAnalysis,
results[1] as MilestoneAnalysis,
results[2] as List<Report>,
));
print('📈 分析任务完成');
// 阶段3:串行执行最终更新(需要所有分析结果)
final updatedProject = await updateProjectMetadata(project, tasks, milestones);
await saveReports(reports);
await notifyTeamMembers(team, updatedProject);
return ProjectStatus(
project: updatedProject,
taskAnalysis: tasks,
milestoneAnalysis: milestones,
reports: reports,
lastUpdated: DateTime.now(),
);
}
// 🎛️ 超时控制的并行执行
Future<List<T?>> parallelWithTimeout<T>(
List<Future<T> Function()> operations,
Duration timeout,
) async {
print('⏰ 执行带超时的并行操作 (${operations.length} 个任务)');
final futures = operations.map((op) =>
op().timeout(timeout).catchError((error) {
print('⚠️ 任务超时或失败: $error');
return null;
})
).toList();
final results = await Future.wait(futures);
final successCount = results.where((r) => r != null).length;
print('📊 并行执行完成: $successCount/${operations.length} 成功');
return results;
}
}
// 辅助类定义(示例)
class User {
final int id;
final String name;
final String email;
User({required this.id, required this.name, required this.email});
factory User.fromJson(Map<String, dynamic> json) {
return User(
id: json['id'],
name: json['name'],
email: json['email'],
);
}
}
class UserProfile {
final User user;
final Map<String, dynamic> preferences;
final List<Activity> activityHistory;
final DateTime lastUpdated;
UserProfile({
required this.user,
required this.preferences,
required this.activityHistory,
required this.lastUpdated,
});
}
// 其他辅助类的简化定义...
class Activity { final String type; final DateTime timestamp; Activity(this.type, this.timestamp); }
class UserStatistics { final Map<String, int> stats; UserStatistics(this.stats); }
class Notification { final String message; final bool isRead; Notification(this.message, this.isRead); }
class UserSettings { final Map<String, dynamic> settings; UserSettings(this.settings); }
class UserDashboard { final User user; final List<Notification> notifications; final UserStatistics statistics; final List<Activity> recentActivity; final UserSettings settings; UserDashboard({required this.user, required this.notifications, required this.statistics, required this.recentActivity, required this.settings}); }
// 模拟的异步函数
Future<User> fetchUser(int id) async => User(id: id, name: 'User$id', email: 'user$id@example.com');
Future<Map<String, dynamic>> fetchUserPreferences(int id) async => {'theme': 'dark', 'language': 'en'};
Future<List<Activity>> fetchActivityHistory(int id) async => [Activity('login', DateTime.now())];
Future<List<Notification>> fetchNotifications(int id) async => [Notification('Welcome!', false)];
Future<UserStatistics> fetchUserStatistics(int id) async => UserStatistics({'posts': 10, 'likes': 50});
Future<List<Activity>> fetchRecentActivity(int id, {int limit = 10}) async => [Activity('post', DateTime.now())];
Future<UserSettings> fetchUserSettings(int id) async => UserSettings({'notifications': true});
2.3 Future 组合操作高级指南
Future 组合操作是处理多个异步任务的核心技能。Dart 提供了多种组合模式,每种都有其特定的使用场景和优势。
⏳ Future.wait - 全部等待模式
class FutureWaitAdvanced {
// 🎯 基础等待所有任务完成
Future<void> basicWaitAll() async {
print('🚀 开始并行执行多个任务');
final stopwatch = Stopwatch()..start();
final results = await Future.wait([
fetchDataA(), // 1秒
fetchDataB(), // 2秒
fetchDataC(), // 3秒
]);
stopwatch.stop();
print('✅ 所有任务完成: $results');
print('⏱️ 总耗时: ${stopwatch.elapsedMilliseconds}ms'); // 约3000ms(最长任务的时间)
}
// 🔧 错误处理策略详解
Future<void> waitWithErrorHandling() async {
print('🛡️ 演示不同的错误处理策略');
// 策略1:eagerError=true(默认)- 遇到第一个错误立即失败
try {
await Future.wait([
Future.value('成功任务1'),
Future.delayed(Duration(seconds: 1), () => throw '任务2失败'),
Future.delayed(Duration(seconds: 2), () => '成功任务3'),
]); // eagerError 默认为 true
} catch (e) {
print('❌ eagerError=true: 第一个错误发生时立即失败: $e');
}
// 策略2:eagerError=false - 等待所有任务完成,但仍会抛出第一个错误
try {
final results = await Future.wait([
Future.value('成功任务1'),
Future.delayed(Duration(milliseconds: 500), () => throw '任务2失败'),
Future.delayed(Duration(seconds: 1), () => '成功任务3'), // 这个仍会执行完
], eagerError: false);
} catch (e) {
print('❌ eagerError=false: 所有任务执行完后抛出第一个错误: $e');
}
// 策略3:安全的并行执行 - 处理所有成功和失败
final safeResults = await Future.wait([
safeFuture(() => fetchDataA()),
safeFuture(() => fetchDataB()),
safeFuture(() => throw '故意失败'),
safeFuture(() => fetchDataC()),
]);
final successes = safeResults.where((r) => r.isSuccess).map((r) => r.value).toList();
final failures = safeResults.where((r) => !r.isSuccess).map((r) => r.error).toList();
print('✅ 成功任务: ${successes.length}');
print('❌ 失败任务: ${failures.length}');
}
// 🚀 分批并行处理大量任务
Future<List<T>> batchParallelProcessing<T>(
List<Future<T> Function()> operations,
int batchSize,
) async {
final List<T> allResults = [];
for (int i = 0; i < operations.length; i += batchSize) {
final batch = operations
.skip(i)
.take(batchSize)
.map((op) => op())
.toList();
print('📦 处理批次 ${(i / batchSize + 1).ceil()}/${(operations.length / batchSize).ceil()}');
final batchResults = await Future.wait(batch);
allResults.addAll(batchResults);
// 批次间的延迟,避免过载
if (i + batchSize < operations.length) {
await Future.delayed(Duration(milliseconds: 100));
}
}
return allResults;
}
// 辅助方法:安全的Future包装
Future<SafeResult<T>> safeFuture<T>(Future<T> Function() operation) async {
try {
final result = await operation();
return SafeResult.success(result);
} catch (error) {
return SafeResult.failure(error);
}
}
// 模拟异步操作
Future<String> fetchDataA() async {
await Future.delayed(Duration(seconds: 1));
return '数据A';
}
Future<String> fetchDataB() async {
await Future.delayed(Duration(seconds: 2));
return '数据B';
}
Future<String> fetchDataC() async {
await Future.delayed(Duration(seconds: 3));
return '数据C';
}
}
// 安全结果包装类
class SafeResult<T> {
final T? value;
final dynamic error;
final bool isSuccess;
SafeResult.success(this.value) : error = null, isSuccess = true;
SafeResult.failure(this.error) : value = null, isSuccess = false;
}
🏃 Future.any - 竞速模式
class FutureAnyAdvanced {
// 🎯 基础竞速 - 第一个完成者获胜
Future<void> basicRaceCondition() async {
print('🏁 开始竞速任务');
final stopwatch = Stopwatch()..start();
final winner = await Future.any([
slowButReliable(), // 3秒,但99%成功
fastButUnreliable(), // 1秒,但50%失败
mediumSpeed(), // 2秒,90%成功
]);
stopwatch.stop();
print('🏆 获胜者: $winner');
print('⏱️ 耗时: ${stopwatch.elapsedMilliseconds}ms');
}
// 🔄 容错竞速 - 处理所有任务都失败的情况
Future<String> faultTolerantRace() async {
try {
return await Future.any([
unreliableService('服务A'),
unreliableService('服务B'),
unreliableService('服务C'),
]);
} catch (error) {
print('❌ 所有服务都失败了: $error');
return '使用默认值';
}
}
// 🎯 智能超时竞速 - 超时后自动切换到备用方案
Future<T> smartTimeoutRace<T>(
Future<T> primary,
Future<T> fallback,
Duration timeout,
) async {
final timeoutFuture = Future.delayed(timeout).then((_) => throw TimeoutException('主任务超时', timeout));
try {
// 主任务与超时任务竞速
return await Future.any([primary, timeoutFuture]);
} catch (e) {
if (e is TimeoutException) {
print('⏰ 主任务超时,切换到备用方案');
return await fallback;
}
rethrow;
}
}
// 🌍 多服务器请求 - 从多个服务器获取相同数据,取最快响应
Future<Map<String, dynamic>> fetchFromMultipleServers(String endpoint) async {
final servers = [
'https://server1.example.com',
'https://server2.example.com',
'https://server3.example.com',
];
try {
final result = await Future.any(
servers.map((server) => httpRequest('$server/$endpoint'))
);
print('✅ 最快响应来自某个服务器');
return result;
} catch (error) {
print('❌ 所有服务器都无响应: $error');
throw Exception('所有服务器不可用');
}
}
// 模拟方法
Future<String> slowButReliable() async {
await Future.delayed(Duration(seconds: 3));
return '可靠但慢的结果';
}
Future<String> fastButUnreliable() async {
await Future.delayed(Duration(seconds: 1));
if (DateTime.now().millisecondsSinceEpoch % 2 == 0) {
throw '快但不可靠的服务失败';
}
return '快速结果';
}
Future<String> mediumSpeed() async {
await Future.delayed(Duration(seconds: 2));
return '中等速度结果';
}
Future<String> unreliableService(String serviceName) async {
await Future.delayed(Duration(milliseconds: 500 + DateTime.now().millisecond));
if (DateTime.now().millisecondsSinceEpoch % 3 == 0) {
return '$serviceName 成功响应';
}
throw '$serviceName 服务失败';
}
Future<Map<String, dynamic>> httpRequest(String url) async {
final delay = 500 + (DateTime.now().millisecondsSinceEpoch % 2000);
await Future.delayed(Duration(milliseconds: delay));
// 模拟网络失败
if (DateTime.now().millisecondsSinceEpoch % 5 == 0) {
throw 'HTTP请求失败: $url';
}
return {
'url': url,
'data': '响应数据',
'timestamp': DateTime.now().toIso8601String(),
'responseTime': '${delay}ms',
};
}
}
🔥 高级组合模式
class AdvancedFutureCombinations {
// 🎭 条件等待 - 根据条件动态决定等待策略
Future<List<String>> conditionalWait({
required bool waitForAll,
required List<Future<String> Function()> operations,
}) async {
if (waitForAll) {
// 等待所有任务完成
return await Future.wait(operations.map((op) => op()));
} else {
// 只要有一个成功就返回
try {
final first = await Future.any(operations.map((op) => op()));
return [first];
} catch (e) {
return ['所有操作都失败了'];
}
}
}
// ⚡ 渐进式加载 - 逐步显示结果
Stream<String> progressiveLoad(List<Future<String> Function()> operations) async* {
final futures = operations.map((op) => op()).toList();
final completed = <bool>List.filled(futures.length, false);
while (completed.contains(false)) {
// 检查每个Future的完成状态
for (int i = 0; i < futures.length; i++) {
if (!completed[i] && futures[i].isCompleted) {
completed[i] = true;
try {
final result = await futures[i];
yield '✅ 任务 ${i + 1} 完成: $result';
} catch (error) {
yield '❌ 任务 ${i + 1} 失败: $error';
}
}
}
// 短暂延迟避免过度检查
await Future.delayed(Duration(milliseconds: 50));
}
}
// 🎯 加权竞速 - 根据任务优先级和成功率选择
Future<T> weightedRace<T>(List<WeightedTask<T>> tasks) async {
// 按权重排序,优先级高的先开始
tasks.sort((a, b) => b.weight.compareTo(a.weight));
final List<Future<T>> futures = [];
for (final task in tasks) {
futures.add(task.operation());
// 高权重任务有更多时间独自竞争
if (task.weight > 0.8) {
await Future.delayed(Duration(milliseconds: 100));
}
}
return await Future.any(futures);
}
// 🔄 级联重试 - 任务失败时自动尝试下一个
Future<T> cascadeRetry<T>(List<Future<T> Function()> operations) async {
for (int i = 0; i < operations.length; i++) {
try {
print('🎯 尝试操作 ${i + 1}/${operations.length}');
return await operations[i]();
} catch (error) {
print('❌ 操作 ${i + 1} 失败: $error');
if (i == operations.length - 1) {
// 最后一个操作也失败了
throw Exception('所有级联操作都失败了');
}
// 短暂延迟后尝试下一个
await Future.delayed(Duration(milliseconds: 200));
}
}
throw Exception('不应该到达这里');
}
// 🎪 动态并发控制 - 根据系统负载调整并发数
Future<List<T>> dynamicConcurrencyLimit<T>(
List<Future<T> Function()> operations,
int maxConcurrency,
) async {
final List<T> results = [];
final semaphore = Semaphore(maxConcurrency);
final futures = operations.map((op) async {
await semaphore.acquire(); // 获取并发许可
try {
return await op();
} finally {
semaphore.release(); // 释放许可
}
});
return await Future.wait(futures);
}
}
// 加权任务类
class WeightedTask<T> {
final Future<T> Function() operation;
final double weight; // 0.0 - 1.0,权重越高优先级越高
WeightedTask(this.operation, this.weight);
}
// 简单信号量实现
class Semaphore {
final int maxCount;
int _currentCount;
final Queue<Completer<void>> _waitQueue = Queue();
Semaphore(this.maxCount) : _currentCount = maxCount;
Future<void> acquire() async {
if (_currentCount > 0) {
_currentCount--;
return;
}
final completer = Completer<void>();
_waitQueue.add(completer);
return completer.future;
}
void release() {
if (_waitQueue.isNotEmpty) {
final completer = _waitQueue.removeFirst();
completer.complete();
} else {
_currentCount++;
}
}
}
3. Stream 流式编程深度解析
Stream 是 Dart 中处理连续异步数据的强大工具,它就像一个水管,数据像水流一样连续不断地流过。无论是处理用户输入、网络数据流,还是实时数据更新,Stream 都能提供优雅的解决方案。
3.1 Stream 基础概念与核心原理
🌊 什么是 Stream?
Stream 代表一个异步数据序列,你可以把它想象成一条传送带,数据项一个接一个地出现。与 Future 不同的是:
-
Future:代表单一异步结果(如一次 API 调用)
-
Stream:代表多个异步数据(如用户点击事件、实时数据更新)
// Future - 单一异步结果
Future<String> fetchUserName() async {
// 返回一个用户名
return 'John Doe';
}
// Stream - 连续异步数据流
Stream<String> userActivityStream() async* {
// 持续产生用户活动数据
yield '用户登录';
await Future.delayed(Duration(seconds: 1));
yield '浏览商品';
await Future.delayed(Duration(seconds: 2));
yield '添加购物车';
await Future.delayed(Duration(seconds: 1));
yield '完成支付';
}
🎭 Stream 的两种类型
class StreamTypes {
// 🔒 单订阅流 (Single Subscription Stream)
// 特点:只能有一个监听器,像私人电话线
void demonstrateSingleSubscription() {
final controller = StreamController<String>();
final stream = controller.stream; // 默认是单订阅流
// 第一个监听器 - 正常工作
stream.listen((data) => print('监听器1: $data'));
// 第二个监听器 - 会报错!
try {
stream.listen((data) => print('监听器2: $data')); // ❌ 异常:Already listening
} catch (e) {
print('错误: $e');
}
// 发送数据
controller.add('测试数据');
controller.close();
}
// 📡 广播流 (Broadcast Stream)
// 特点:可以有多个监听器,像广播电台
void demonstrateBroadcastStream() {
final controller = StreamController<String>.broadcast();
final stream = controller.stream; // 广播流
// 多个监听器都能接收到数据
stream.listen((data) => print('监听器A: $data'));
stream.listen((data) => print('监听器B: $data'));
stream.listen((data) => print('监听器C: $data'));
// 发送数据 - 所有监听器都会收到
controller.add('广播消息1');
controller.add('广播消息2');
// 清理资源
controller.close();
}
// 🔄 转换单订阅流为广播流
void convertToBroadcast() {
// 原始单订阅流
final singleStream = Stream.fromIterable([1, 2, 3, 4, 5]);
// 转换为广播流
final broadcastStream = singleStream.asBroadcastStream();
// 现在可以多次监听
broadcastStream.listen((data) => print('监听器X: $data'));
broadcastStream.listen((data) => print('监听器Y: $data'));
}
}
🏗️ Stream 的生命周期与状态管理
class StreamLifecycle {
// 演示 Stream 完整的生命周期
Future<void> demonstrateLifecycle() async {
print('🚀 创建 StreamController');
final controller = StreamController<String>();
// 1. 监听阶段 - 设置监听器
print('👂 设置监听器');
late StreamSubscription<String> subscription;
subscription = controller.stream.listen(
(data) {
print('📨 接收数据: $data');
},
onError: (error) {
print('❌ 发生错误: $error');
},
onDone: () {
print('✅ Stream 已完成');
},
);
// 2. 数据发送阶段
print('📤 开始发送数据');
controller.add('第一条消息');
await Future.delayed(Duration(milliseconds: 500));
controller.add('第二条消息');
await Future.delayed(Duration(milliseconds: 500));
// 3. 错误处理演示
controller.addError('模拟错误情况');
await Future.delayed(Duration(milliseconds: 500));
controller.add('错误后的消息');
await Future.delayed(Duration(milliseconds: 500));
// 4. 暂停和恢复
print('⏸️ 暂停监听');
subscription.pause();
controller.add('暂停期间的消息'); // 这条消息会被缓存
await Future.delayed(Duration(milliseconds: 500));
print('▶️ 恢复监听');
subscription.resume(); // 缓存的消息会被处理
await Future.delayed(Duration(milliseconds: 500));
// 5. 关闭 Stream
print('🔚 关闭 Stream');
await controller.close();
// 6. 清理资源
print('🧹 清理订阅');
await subscription.cancel();
}
// 高级订阅管理
Future<void> advancedSubscriptionManagement() async {
final controller = StreamController<int>();
// 创建可管理的订阅
StreamSubscription<int>? subscription;
subscription = controller.stream.listen(
(data) => print('处理数据: $data'),
onError: (error) => print('处理错误: $error'),
onDone: () {
print('Stream 完成,自动清理订阅');
subscription = null; // 清空引用
},
cancelOnError: false, // 遇到错误不自动取消订阅
);
// 发送一些数据和错误
for (int i = 1; i <= 5; i++) {
if (i == 3) {
controller.addError('第 $i 个数据出错');
} else {
controller.add(i);
}
await Future.delayed(Duration(milliseconds: 300));
}
// 正确关闭
await controller.close();
// 确保订阅被清理
if (subscription != null) {
await subscription.cancel();
subscription = null;
}
}
// 资源清理的最佳实践
Future<void> resourceManagementBestPractices() async {
StreamController<String>? controller;
StreamSubscription<String>? subscription;
try {
// 创建资源
controller = StreamController<String>();
// 设置监听
subscription = controller.stream.listen(
(data) => processData(data),
onError: (error) => handleError(error),
);
// 模拟一些业务逻辑
await doSomeAsyncWork(controller);
} catch (error) {
print('业务逻辑出错: $error');
} finally {
// 确保资源被正确清理
print('🧹 开始资源清理');
// 1. 取消订阅
await subscription?.cancel();
subscription = null;
// 2. 关闭控制器
if (controller != null && !controller.isClosed) {
await controller.close();
}
controller = null;
print('✅ 资源清理完成');
}
}
// 辅助方法
void processData(String data) {
print('处理数据: $data');
}
void handleError(dynamic error) {
print('处理错误: $error');
}
Future<void> doSomeAsyncWork(StreamController<String> controller) async {
for (int i = 1; i <= 3; i++) {
controller.add('工作数据 $i');
await Future.delayed(Duration(milliseconds: 200));
}
}
}
3.2 Stream 创建方式详解
创建 Stream 有多种方式,每种方式都有其特定的使用场景。选择合适的创建方式可以让你的代码更高效、更易维护。
📋 基础创建方式
class StreamCreationBasics {
// 🔢 从集合创建 - 适合已知数据集
void fromIterableDemo() {
print('📋 演示从集合创建 Stream');
// 基础用法
final numbers = Stream.fromIterable([1, 2, 3, 4, 5]);
numbers.listen((number) => print('数字: $number'));
// 实际应用场景:处理配置列表
final configs = ['database', 'redis', 'elasticsearch'];
final configStream = Stream.fromIterable(configs);
configStream.listen((config) async {
print('正在初始化 $config 服务...');
await initializeService(config);
print('✅ $config 服务初始化完成');
});
}
// ⏰ 定期发射数据 - 适合定时任务、心跳检测
void periodicDemo() {
print('⏰ 演示定期发射数据');
// 基础用法:每秒递增计数
final counter = Stream.periodic(
Duration(seconds: 1),
(count) => count
).take(5); // 只取前5个
counter.listen(
(count) => print('计数: $count'),
onDone: () => print('计数完成'),
);
// 实际应用:系统监控
final systemMonitor = Stream.periodic(
Duration(minutes: 1),
(tick) => SystemStatus(
timestamp: DateTime.now(),
cpuUsage: getCpuUsage(),
memoryUsage: getMemoryUsage(),
tick: tick,
),
);
systemMonitor.listen((status) {
print('📊 系统状态更新: CPU ${status.cpuUsage}%, 内存 ${status.memoryUsage}%');
if (status.cpuUsage > 80 || status.memoryUsage > 90) {
print('⚠️ 系统资源使用率过高!');
}
});
}
// 🔮 从 Future 创建 - 适合异步结果转流
void fromFutureDemo() {
print('🔮 演示从 Future 创建 Stream');
// 基础用法
final futureStream = Stream.fromFuture(
Future.delayed(Duration(seconds: 2), () => 'Hello from Future!')
);
futureStream.listen(
(data) => print('接收到: $data'),
onDone: () => print('Future Stream 完成'),
);
// 实际应用:API 响应转流处理
final apiResultStream = Stream.fromFuture(fetchUserProfile(123));
apiResultStream.listen(
(profile) => print('用户资料: ${profile.name}'),
onError: (error) => print('获取失败: $error'),
);
}
// 🔗 从多个 Futures 创建
void fromFuturesDemo() {
print('🔗 演示从多个 Futures 创建 Stream');
// 将多个 Future 转换为 Stream
final futures = [
fetchUserData(1),
fetchUserData(2),
fetchUserData(3),
];
final userStream = Stream.fromFutures(futures);
userStream.listen(
(userData) => print('用户数据: $userData'),
onDone: () => print('所有用户数据加载完成'),
);
}
// 辅助方法
Future<void> initializeService(String service) async {
await Future.delayed(Duration(milliseconds: 500)); // 模拟初始化时间
}
double getCpuUsage() => 20 + (DateTime.now().millisecond % 60);
double getMemoryUsage() => 30 + (DateTime.now().millisecond % 50);
Future<UserProfile> fetchUserProfile(int id) async {
await Future.delayed(Duration(seconds: 1));
return UserProfile(id: id, name: 'User$id');
}
Future<Map<String, dynamic>> fetchUserData(int id) async {
await Future.delayed(Duration(milliseconds: 500 * id));
return {'id': id, 'name': 'User$id', 'email': 'user$id@example.com'};
}
}
// 辅助类
class SystemStatus {
final DateTime timestamp;
final double cpuUsage;
final double memoryUsage;
final int tick;
SystemStatus({
required this.timestamp,
required this.cpuUsage,
required this.memoryUsage,
required this.tick,
});
}
class UserProfile {
final int id;
final String name;
UserProfile({required this.id, required this.name});
}
🎛️ StreamController 高级用法
class StreamControllerAdvanced {
// 🎮 基础 StreamController 使用
void basicControllerDemo() {
print('🎮 演示基础 StreamController');
// 创建控制器
final controller = StreamController<String>();
// 设置监听器
controller.stream.listen(
(data) => print('控制器数据: $data'),
onError: (error) => print('控制器错误: $error'),
onDone: () => print('控制器完成'),
);
// 发送数据
controller.add('消息1');
controller.add('消息2');
// 发送错误
controller.addError('测试错误');
// 继续发送数据
controller.add('消息3');
// 关闭控制器
controller.close();
}
// 📡 广播控制器的实际应用
class EventBus {
final StreamController<Event> _controller =
StreamController<Event>.broadcast();
// 公开只读的流
Stream<Event> get events => _controller.stream;
// 发布事件
void publish(Event event) {
if (!_controller.isClosed) {
_controller.add(event);
}
}
// 订阅特定类型的事件
StreamSubscription<T> subscribe<T extends Event>(
void Function(T event) onEvent,
) {
return events
.where((event) => event is T)
.cast<T>()
.listen(onEvent);
}
// 清理资源
Future<void> dispose() async {
await _controller.close();
}
}
// 🔄 同步控制器 - 用于同步数据处理
void synchronousControllerDemo() {
print('🔄 演示同步控制器');
final controller = StreamController<int>.sync();
// 同步监听器会立即处理数据
controller.stream.listen((data) {
print('同步处理: $data');
});
// 数据会立即被处理
controller.add(1);
controller.add(2);
controller.add(3);
controller.close();
}
// 🎯 带回调的控制器 - 监控监听器状态
void controllerWithCallbacks() {
print('🎯 演示带回调的控制器');
late StreamController<String> controller;
controller = StreamController<String>(
onListen: () {
print('👂 有监听器开始监听');
// 可以在这里开始产生数据
controller.add('欢迎消息');
},
onCancel: () {
print('🛑 监听器取消监听');
// 可以在这里清理资源
},
onPause: () {
print('⏸️ 监听器暂停');
},
onResume: () {
print('▶️ 监听器恢复');
},
);
// 监听流
final subscription = controller.stream.listen(
(data) => print('接收: $data'),
);
// 发送数据
controller.add('正常数据');
// 暂停监听
subscription.pause();
controller.add('暂停期间数据'); // 这条数据会被缓存
// 恢复监听
subscription.resume();
// 取消监听
subscription.cancel();
controller.close();
}
}
// 事件基类
abstract class Event {
final DateTime timestamp;
Event() : timestamp = DateTime.now();
}
class UserLoginEvent extends Event {
final String username;
UserLoginEvent(this.username);
}
class MessageSentEvent extends Event {
final String message;
final String recipient;
MessageSentEvent(this.message, this.recipient);
}
class SystemErrorEvent extends Event {
final String error;
final String stackTrace;
SystemErrorEvent(this.error, this.stackTrace);
}
🌟 高级 Stream 生成器
class AdvancedStreamGenerators {
// 🏭 async* 生成器 - 创建自定义异步流
Stream<int> countdownGenerator(int from) async* {
print('🚀 开始倒计时从 $from');
for (int i = from; i >= 0; i--) {
// 每秒产生一个数字
await Future.delayed(Duration(seconds: 1));
yield i;
if (i == 0) {
print('🎉 倒计时结束!');
}
}
}
// 📊 数据生成器 - 模拟实时数据流
Stream<SensorData> sensorDataGenerator() async* {
print('📊 开始传感器数据流');
int dataCount = 0;
while (dataCount < 100) { // 生成100个数据点
final data = SensorData(
id: dataCount,
temperature: 20 + (dataCount % 30), // 20-50度变化
humidity: 40 + (dataCount % 40), // 40-80%变化
timestamp: DateTime.now(),
);
yield data;
dataCount++;
// 每200毫秒产生一个数据点
await Future.delayed(Duration(milliseconds: 200));
}
print('✅ 传感器数据生成完成');
}
// 🔄 无限流生成器 - 需要外部控制停止
Stream<HeartbeatData> heartbeatGenerator() async* {
print('💓 开始心跳数据流');
int beatCount = 0;
while (true) { // 无限循环
final heartbeat = HeartbeatData(
beatNumber: beatCount++,
timestamp: DateTime.now(),
bpm: 60 + (beatCount % 40), // 模拟心率变化
);
yield heartbeat;
// 每秒一个心跳
await Future.delayed(Duration(seconds: 1));
// 可以通过外部条件控制停止
if (beatCount > 3600) { // 1小时后自动停止
print('⏰ 心跳监控已运行1小时,自动停止');
break;
}
}
}
// 🎲 条件生成器 - 根据条件产生不同的数据
Stream<GameEvent> gameEventGenerator() async* {
print('🎮 开始游戏事件流');
final random = Random();
int eventId = 0;
for (int round = 1; round <= 10; round++) {
await Future.delayed(Duration(seconds: 1));
// 根据随机数决定事件类型
final eventType = random.nextInt(4);
switch (eventType) {
case 0:
yield PlayerJoinEvent(eventId++, 'Player${random.nextInt(100)}');
break;
case 1:
yield ScoreUpdateEvent(eventId++, random.nextInt(1000));
break;
case 2:
yield PowerUpEvent(eventId++, ['speed', 'strength', 'shield'][random.nextInt(3)]);
break;
case 3:
yield GameOverEvent(eventId++, 'Game Over - Round $round');
break;
}
}
print('🏁 游戏事件生成完成');
}
// 演示生成器使用
Future<void> demonstrateGenerators() async {
print('🌟 演示各种生成器');
// 1. 倒计时生成器
await for (int count in countdownGenerator(5)) {
print('倒计时: $count');
}
// 2. 传感器数据(只取前5个)
await for (SensorData data in sensorDataGenerator().take(5)) {
print('传感器: 温度${data.temperature}°C, 湿度${data.humidity}%');
}
// 3. 心跳数据(只监听10秒)
final heartbeatSubscription = heartbeatGenerator()
.timeout(Duration(seconds: 10))
.listen(
(heartbeat) => print('心跳: ${heartbeat.bpm} BPM'),
onError: (e) => print('心跳监控结束'),
);
// 4. 游戏事件
await for (GameEvent event in gameEventGenerator()) {
print('游戏事件: ${event.description}');
}
}
}
// 辅助数据类
class SensorData {
final int id;
final double temperature;
final double humidity;
final DateTime timestamp;
SensorData({
required this.id,
required this.temperature,
required this.humidity,
required this.timestamp,
});
}
class HeartbeatData {
final int beatNumber;
final DateTime timestamp;
final int bpm;
HeartbeatData({
required this.beatNumber,
required this.timestamp,
required this.bpm,
});
}
abstract class GameEvent {
final int id;
final DateTime timestamp;
GameEvent(this.id) : timestamp = DateTime.now();
String get description;
}
class PlayerJoinEvent extends GameEvent {
final String playerName;
PlayerJoinEvent(int id, this.playerName) : super(id);
@override
String get description => '玩家 $playerName 加入游戏';
}
class ScoreUpdateEvent extends GameEvent {
final int score;
ScoreUpdateEvent(int id, this.score) : super(id);
@override
String get description => '分数更新: $score';
}
class PowerUpEvent extends GameEvent {
final String powerType;
PowerUpEvent(int id, this.powerType) : super(id);
@override
String get description => '获得能力: $powerType';
}
class GameOverEvent extends GameEvent {
final String reason;
GameOverEvent(int id, this.reason) : super(id);
@override
String get description => reason;
}
3.3 Stream 操作符
class StreamOperators {
// 基本监听
Future<void> basicListen() async {
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
stream.listen(
(data) => print('Data: $data'),
onError: (error) => print('Error: $error'),
onDone: () => print('Stream completed'),
);
}
// 转换操作
Future<void> transformOperations() async {
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
// map - 转换每个元素
stream
.map((x) => x * 2)
.where((x) => x > 5) // where - 过滤
.take(3) // take - 取前3个
.listen(print); // 输出: 6, 8, 10
}
// 异步转换
Future<void> asyncTransform() async {
final stream = Stream.fromIterable(['a', 'b', 'c']);
await for (String letter in stream.asyncMap((letter) async {
await Future.delayed(Duration(milliseconds: 100));
return letter.toUpperCase();
})) {
print(letter); // A, B, C (每个间隔100ms)
}
}
// 累积操作
Future<void> reduceOperations() async {
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
final sum = await stream.reduce((a, b) => a + b);
print('Sum: $sum'); // 15
final list = await stream.toList();
print('List: $list'); // [1, 2, 3, 4, 5]
}
}
3.4 Broadcast Stream
class BroadcastStreamExample {
late StreamController<int> _controller;
late Stream<int> _stream;
BroadcastStreamExample() {
_controller = StreamController<int>.broadcast();
_stream = _controller.stream;
}
void multipleListeners() {
// 第一个监听器
_stream.listen((data) => print('Listener 1: $data'));
// 第二个监听器
_stream.listen((data) => print('Listener 2: $data'));
// 添加数据
_controller.add(1);
_controller.add(2);
}
void dispose() {
_controller.close();
}
}
4. Isolate 隔离区
4.1 Isolate 基础概念
每个 Dart 程序都在隔离区中运行:
-
主隔离区:运行 main() 函数
-
工作隔离区:处理CPU密集型任务
-
内存隔离:隔离区间不共享内存
-
消息传递:通过 SendPort/ReceivePort 通信
4.2 创建和管理 Isolate
import 'dart:isolate';
import 'dart:math';
class IsolateExample {
// 基础 Isolate 创建
static void isolateEntryPoint(String message) {
print('Isolate received: $message');
}
Future<void> basicIsolate() async {
await Isolate.spawn(isolateEntryPoint, 'Hello from main');
print('Main isolate continues...');
}
// 双向通信
static void calculatorIsolate(SendPort sendPort) {
// 创建接收端口
final receivePort = ReceivePort();
// 发送发送端口给主隔离区
sendPort.send(receivePort.sendPort);
// 监听来自主隔离区的消息
receivePort.listen((message) {
if (message is Map) {
final operation = message['operation'];
final numbers = message['numbers'] as List<int>;
int result;
switch (operation) {
case 'sum':
result = numbers.reduce((a, b) => a + b);
break;
case 'product':
result = numbers.reduce((a, b) => a * b);
break;
default:
result = 0;
}
sendPort.send({'result': result});
}
});
}
Future<int> calculateInIsolate(String operation, List<int> numbers) async {
final receivePort = ReceivePort();
// 启动计算隔离区
await Isolate.spawn(calculatorIsolate, receivePort.sendPort);
// 获取计算隔离区的发送端口
final sendPort = await receivePort.first as SendPort;
// 创建结果接收端口
final responsePort = ReceivePort();
// 发送计算任务
sendPort.send({
'operation': operation,
'numbers': numbers,
'responsePort': responsePort.sendPort,
});
// 等待结果
final result = await responsePort.first as Map;
return result['result'];
}
}
4.3 Compute 函数(推荐方式)
import 'dart:isolate';
import 'dart:math';
// 独立的计算函数(必须是顶级函数)
int heavyComputation(List<int> numbers) {
// 模拟CPU密集型计算
int result = 0;
for (int i = 0; i < 1000000; i++) {
for (int number in numbers) {
result += (number * sin(i.toDouble())).round();
}
}
return result;
}
class ComputeExample {
// 使用 compute 函数简化 Isolate 操作
Future<int> computeHeavyTask() async {
final numbers = List.generate(100, (i) => Random().nextInt(100));
// 在后台隔离区执行
final result = await compute(heavyComputation, numbers);
return result;
}
// 多个并行计算
Future<List<int>> parallelCompute() async {
final tasks = List.generate(4, (i) =>
compute(heavyComputation, List.generate(10, (j) => i + j))
);
return Future.wait(tasks);
}
}
5. 实践最佳案例
5.1 网络请求优化
import 'dart:convert';
import 'dart:io';
class NetworkService {
static const String baseUrl = 'https://api.example.com';
static const Duration timeout = Duration(seconds: 10);
// 并发网络请求
Future<List<Map<String, dynamic>>> fetchMultipleUsers(
List<int> userIds
) async {
final futures = userIds.map((id) => fetchUser(id));
return Future.wait(futures, eagerError: false);
}
Future<Map<String, dynamic>> fetchUser(int id) async {
try {
final client = HttpClient();
final request = await client.getUrl(Uri.parse('$baseUrl/users/$id'))
..headers.add('Accept', 'application/json');
final response = await request.close().timeout(timeout);
final body = await response.transform(utf8.decoder).join();
client.close();
return json.decode(body);
} catch (e) {
return {'error': 'Failed to fetch user $id: $e'};
}
}
// 带重试机制的请求
Future<T> requestWithRetry<T>(
Future<T> Function() request, {
int maxRetries = 3,
Duration delay = const Duration(seconds: 1),
}) async {
int attempts = 0;
while (attempts < maxRetries) {
try {
return await request();
} catch (e) {
attempts++;
if (attempts >= maxRetries) rethrow;
await Future.delayed(delay * attempts);
}
}
throw Exception('Max retries exceeded');
}
}
5.2 文件处理和 I/O
import 'dart:io';
import 'dart:convert';
class FileProcessingService {
// 异步文件读取
Future<String> readFileAsync(String path) async {
final file = File(path);
return await file.readAsString();
}
// 流式处理大文件
Stream<String> readFileByLines(String path) async* {
final file = File(path);
final stream = file.openRead();
await for (String line in stream
.transform(utf8.decoder)
.transform(LineSplitter())) {
yield line;
}
}
// 批量文件处理
Future<void> processMultipleFiles(List<String> filePaths) async {
const int concurrency = 3; // 限制并发数
for (int i = 0; i < filePaths.length; i += concurrency) {
final batch = filePaths
.skip(i)
.take(concurrency)
.map((path) => processFile(path));
await Future.wait(batch);
}
}
Future<void> processFile(String path) async {
await for (String line in readFileByLines(path)) {
// 处理每一行
await processLine(line);
}
}
Future<void> processLine(String line) async {
// 模拟异步处理
await Future.delayed(Duration(milliseconds: 10));
}
}
5.3 响应式编程模式
class DataRepository {
final StreamController<List<User>> _usersController =
StreamController<List<User>>.broadcast();
final StreamController<String> _searchController =
StreamController<String>();
Stream<List<User>> get users => _usersController.stream;
Sink<String> get searchSink => _searchController.sink;
DataRepository() {
// 响应搜索请求
_searchController.stream
.debounce(Duration(milliseconds: 300)) // 防抖
.distinct() // 去重
.asyncMap((query) => searchUsers(query)) // 异步搜索
.listen((users) {
_usersController.add(users);
});
}
Future<List<User>> searchUsers(String query) async {
await Future.delayed(Duration(milliseconds: 500)); // 模拟网络延迟
// 实际搜索逻辑
return mockSearchResults(query);
}
List<User> mockSearchResults(String query) {
// 模拟数据
return [];
}
void dispose() {
_usersController.close();
_searchController.close();
}
}
// Stream 扩展 - 防抖功能
extension StreamExtensions<T> on Stream<T> {
Stream<T> debounce(Duration duration) {
StreamController<T> controller = StreamController<T>();
Timer? timer;
listen((data) {
timer?.cancel();
timer = Timer(duration, () {
controller.add(data);
});
}, onDone: () {
timer?.cancel();
controller.close();
});
return controller.stream;
}
}
5.4 错误处理和监控
class ConcurrencyErrorHandler {
// 全局错误处理
void setupErrorHandling() {
// 捕获未处理的异常
runZonedGuarded(() {
// 应用代码
runApp();
}, (error, stack) {
// 记录错误
logError(error, stack);
});
}
// Future 错误恢复
Future<T> withFallback<T>(
Future<T> primary,
T fallbackValue, {
bool Function(dynamic error)? shouldFallback,
}) async {
try {
return await primary;
} catch (error) {
if (shouldFallback?.call(error) ?? true) {
return fallbackValue;
}
rethrow;
}
}
// Stream 错误恢复
Stream<T> streamWithErrorRecovery<T>(
Stream<T> source,
T Function(dynamic error) onError,
) async* {
await for (T value in source) {
try {
yield value;
} catch (error) {
yield onError(error);
}
}
}
void logError(dynamic error, StackTrace stack) {
print('Error: $error');
print('Stack: $stack');
}
void runApp() {
// 应用入口
}
}
总结
核心要点
-
事件循环优先级:微任务 > 事件任务
-
Future vs Stream:单次异步 vs 多次异步数据流
-
Isolate 使用场景:CPU密集型计算、避免阻塞UI
-
错误处理:始终考虑异常情况和超时处理
性能优化建议
- 合理使用
Future.wait()
进行并发操作
- CPU密集型任务使用
compute()
函数
- 大数据处理优先考虑 Stream
- 避免在主隔离区执行长时间运行的同步操作
调试技巧
// 性能监控
void measureAsyncPerformance<T>(Future<T> future, String name) async {
final stopwatch = Stopwatch()..start();
try {
await future;
} finally {
stopwatch.stop();
print('$name took ${stopwatch.elapsedMilliseconds}ms');
}
}
// Stream 调试
Stream<T> debugStream<T>(Stream<T> source, String name) {
return source.map((data) {
print('$name: $data');
return data;
});
}
这份总结涵盖了 Dart 并发编程的核心概念和实际应用,结合官方API文档和最佳实践,帮助开发者构建高效、可靠的异步应用程序。