普通视图
type-challenges(ts类型体操): 8 - 对象部分属性只读
10分钟带你用Three.js手搓一个3D世界,代码少得离谱!
RBAC 权限系统实战(一):页面级访问控制全解析
JAVA基础之集合框架详解
【配置化 CRUD 01】搜索重置组件:封装与复用
专业指南:从核心概念到3D动效实现
AI全栈筑基:React Router DOM 路由配置
Vercel 团队 10 年 React 性能优化经验:10 大核心策略让性能提升 300%
Dart - 全面认识Stream
20个例子掌握RxJS——第十三章使用 interval 和 scan 实现定时器
20个例子掌握RxJS——第十二章使用 throttleTime 实现弹幕系统
20个例子掌握RxJS——第十章使用 RxJS 实现大文件分片上传
为什么程序员不自己开发一个小程序赚钱
20个例子掌握RxJS——第九章使用 exhaustMap 实现轮询机制
20个例子掌握RxJS——第八章使用 retryWhen 实现失败重试机制
20个例子掌握RxJS——第七章使用 shareReplay 实现 Token 刷新的并发控制
RxJS 实战:使用 shareReplay 实现 Token 刷新的并发控制
概述
在需要身份验证的 Web 应用中,Token 过期是一个常见问题。当多个请求同时发起,且 Token 都已过期时,如果每个请求都独立刷新 Token,会导致:
- 多个重复的 Token 刷新请求
- 资源浪费
- 可能的竞态条件
本章将介绍如何使用 RxJS 的 shareReplay 操作符来实现 Token 刷新的并发控制,确保多个请求共享同一个 Token 刷新请求。
问题场景
假设我们有 3 个 API 请求同时发起,且 Token 都已过期:
- 请求 A:检测到 Token 过期 → 发起 Token 刷新请求 1
- 请求 B:检测到 Token 过期 → 发起 Token 刷新请求 2
- 请求 C:检测到 Token 过期 → 发起 Token 刷新请求 3
问题:3 个请求都独立刷新 Token,导致重复请求和资源浪费。
期望:3 个请求共享同一个 Token 刷新请求,只刷新一次。
shareReplay 操作符简介
shareReplay 是 RxJS 中用于共享 Observable 结果的操作符。它会:
- 共享订阅:多个订阅者共享同一个源 Observable
- 缓存结果:缓存指定数量的最新值
- 避免重复执行:源 Observable 只执行一次
基本语法
source$.pipe(
shareReplay(1) // 缓存最新的 1 个值
)
解决方案:使用 shareReplay 共享 Token 刷新
实现思路
- 创建一个 Token 刷新 Observable,使用
shareReplay共享 - 当请求检测到 Token 过期时,订阅共享的 Token 刷新 Observable
- Token 刷新完成后,所有等待的请求都使用新的 Token 重试
核心代码
// 当前token(初始为过期token)
private currentToken = 'expired_token';
// token刷新Observable(使用shareReplay确保只有一个请求)
private tokenRefresh$: Observable<TokenResponse> | null = null;
// 刷新token(使用shareReplay确保并发请求时只有一个token刷新请求)
private refreshToken(): Observable<TokenResponse> {
// 如果已经有正在进行的token刷新请求,直接返回该Observable
if (this.tokenRefresh$) {
return this.tokenRefresh$;
}
// 标记正在刷新token
this.refreshingToken = true;
this.cdr.detectChanges();
// 创建新的token刷新请求,使用shareReplay确保多个订阅者共享同一个请求
this.tokenRefresh$ = this.http.post<TokenResponse>(`${this.apiBaseUrl}/api/refresh-token`, {})
.pipe(
shareReplay(1), // 关键:使用shareReplay确保只有一个请求,所有订阅者共享结果
catchError(error => {
console.error('Token刷新失败:', error);
this.tokenRefresh$ = null; // 重置,允许重试
this.refreshingToken = false;
this.cdr.detectChanges();
return throwError(() => error);
}),
finalize(() => {
// 请求完成后重置tokenRefresh$,允许下次刷新
setTimeout(() => {
this.tokenRefresh$ = null;
this.refreshingToken = false;
this.cdr.detectChanges();
}, 100);
})
);
return this.tokenRefresh$;
}
// 发起带token的请求(自动处理token刷新)
private makeRequestWithToken(apiName: string, apiUrl: string): Observable<ApiResponse> {
// 先尝试使用当前token请求
return this.http.get<ApiResponse>(`${this.apiBaseUrl}${apiUrl}`, {
headers: new HttpHeaders({
'Authorization': `Bearer ${this.currentToken}`
})
}).pipe(
catchError((error) => {
// 如果token过期(401错误)
if (error.status === 401 && error.error?.code === 'TOKEN_EXPIRED') {
console.log(`${apiName} token过期,等待token刷新...`);
// 刷新token(如果已经有正在进行的刷新请求,会复用)
return this.refreshToken().pipe(
switchMap((tokenResponse) => {
// token刷新成功,更新当前token
this.currentToken = tokenResponse.data.token;
this.currentTokenDisplay = tokenResponse.data.token;
this.cdr.detectChanges();
console.log(`${apiName} token刷新成功,使用新token重试请求`);
// 使用新token重试请求
return this.http.get<ApiResponse>(`${this.apiBaseUrl}${apiUrl}`, {
headers: new HttpHeaders({
'Authorization': `Bearer ${this.currentToken}`
})
});
}),
catchError((refreshError) => {
console.error(`${apiName} token刷新失败:`, refreshError);
return throwError(() => refreshError);
})
);
}
// 其他错误直接抛出
return throwError(() => error);
})
);
}
关键点解析
1. shareReplay 的共享机制
当多个请求同时检测到 Token 过期时:
-
第一个请求:调用
refreshToken(),创建新的 Token 刷新 Observable,并订阅它(发起 HTTP 请求) -
第二个请求:调用
refreshToken(),发现tokenRefresh$已存在,直接返回共享的 Observable,并订阅它 -
第三个请求:调用
refreshToken(),发现tokenRefresh$已存在,直接返回共享的 Observable,并订阅它
等待机制:
- 第二个、第三个请求通过订阅同一个 Observable(
tokenRefresh$)来实现等待 - 当它们调用
this.refreshToken().pipe(switchMap(...))时,switchMap会等待上游 Observable(tokenRefresh$)发出值 - 由于使用了
shareReplay,多个订阅者会共享同一个底层的 Observable 执行 - 当第一个订阅者已经发起 HTTP 请求时,后续的订阅者会"加入"这个正在进行的执行
- 当 HTTP 请求完成并发出 Token 值时,所有订阅者都会同时收到这个值
- 然后
switchMap才会执行下游的操作(使用新 token 重试请求)
结果:3 个请求共享同一个 Token 刷新请求,只刷新一次,并且都会等待 Token 刷新完成后才继续。
2. 缓存机制
shareReplay(1) 会缓存最新的 1 个值。这意味着:
- 如果 Token 刷新已完成,后续订阅者会立即收到缓存的结果
- 不需要重新发起请求
3. 错误处理
如果 Token 刷新失败:
- 重置
tokenRefresh$为null,允许重试 - 抛出错误,让调用者处理
4. 清理机制
在 finalize 中重置 tokenRefresh$,确保下次 Token 过期时可以重新刷新。
执行流程示例
假设 3 个请求同时发起,且 Token 都已过期:
-
请求 A:检测到 Token 过期
- 调用
refreshToken(),创建 Token 刷新 Observable(使用shareReplay) - 调用
this.refreshToken().pipe(switchMap(...)),订阅tokenRefresh$ - 由于是第一个订阅者,开始执行底层 Observable,发起 HTTP 请求(Token 刷新)
- 调用
-
请求 B:检测到 Token 过期(在请求 A 的 Token 刷新完成前,例如 1 秒后)
- 调用
refreshToken(),发现tokenRefresh$已存在,返回同一个 Observable - 调用
this.refreshToken().pipe(switchMap(...)),订阅同一个tokenRefresh$ - 由于使用了
shareReplay,加入正在进行的 HTTP 请求执行(不发起新请求) -
switchMap等待上游 Observable(tokenRefresh$)发出值
- 调用
-
请求 C:检测到 Token 过期(在请求 A 的 Token 刷新完成前,例如 2 秒后)
- 调用
refreshToken(),发现tokenRefresh$已存在,返回同一个 Observable - 调用
this.refreshToken().pipe(switchMap(...)),订阅同一个tokenRefresh$ - 由于使用了
shareReplay,加入正在进行的 HTTP 请求执行(不发起新请求) -
switchMap等待上游 Observable(tokenRefresh$)发出值
- 调用
-
Token 刷新完成(例如 3 秒后)
- HTTP 请求返回新的 Token
-
tokenRefresh$Observable 发出值(新的 Token) - 所有订阅者(请求 A、B、C)同时收到新的 Token
- 所有请求的
switchMap同时执行,使用新 Token 重试各自的请求
关键点:
- 第二个、第三个请求通过订阅同一个 Observable 来实现等待
-
switchMap会阻塞等待上游 Observable 发出值,然后才执行下游操作 -
shareReplay确保多个订阅者共享同一个底层的 HTTP 请求执行
结果:只发起 1 次 Token 刷新请求,3 个请求共享结果,并且都会等待 Token 刷新完成后才继续 ✅
与其他方案的对比
方案 1:不使用 shareReplay(有问题)
// ❌ 错误示例:每个请求都独立刷新 Token
private refreshToken(): Observable<TokenResponse> {
return this.http.post('/api/refresh-token', {}); // 可能发起多次请求
}
方案 2:使用标志位(复杂)
// ⚠️ 可行但复杂:需要手动管理标志位和 Promise
private isRefreshing = false;
private refreshPromise?: Promise<TokenResponse>;
private async refreshToken(): Promise<TokenResponse> {
if (this.isRefreshing && this.refreshPromise) {
return this.refreshPromise; // 等待正在进行的刷新
}
this.isRefreshing = true;
this.refreshPromise = this.http.post('/api/refresh-token', {}).toPromise();
// ...
}
方案 3:使用 shareReplay(推荐)✅
// ✅ 推荐:简洁、自动管理
private tokenRefresh$ = this.http.post('/api/refresh-token', {}).pipe(
shareReplay(1)
);
实际应用场景
1. HTTP 拦截器中的 Token 刷新
// HTTP 拦截器
intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
return next.handle(req).pipe(
catchError(error => {
if (error.status === 401) {
return this.refreshToken().pipe(
switchMap(token => {
// 使用新 Token 重试请求
const cloned = req.clone({
setHeaders: { Authorization: `Bearer ${token}` }
});
return next.handle(cloned);
})
);
}
return throwError(() => error);
})
);
}
2. 多个 API 请求的 Token 刷新
// 同时发起多个请求
forkJoin({
user: this.getUser(),
orders: this.getOrders(),
products: this.getProducts()
}).subscribe(results => {
// 所有请求共享同一个 Token 刷新请求
});
性能优化建议
1. 添加 Token 过期时间检查
在刷新 Token 前,检查 Token 是否真的过期:
private isTokenExpired(): boolean {
// 检查 Token 是否过期
return this.tokenExpiryTime < Date.now();
}
private refreshToken(): Observable<TokenResponse> {
if (!this.isTokenExpired()) {
return of({ token: this.currentToken }); // Token 未过期,直接返回
}
// ...
}
2. 添加重试机制
对于 Token 刷新失败的情况,可以添加重试:
this.http.post('/api/refresh-token', {}).pipe(
retry(2), // 失败后重试 2 次
shareReplay(1)
)
3. 添加超时处理
为 Token 刷新添加超时处理:
this.http.post('/api/refresh-token', {}).pipe(
timeout(5000), // 5 秒超时
shareReplay(1)
)
注意事项
- 内存泄漏:确保在组件销毁时取消订阅
- 错误处理:确保 Token 刷新失败时有适当的错误处理
-
并发控制:
shareReplay确保只有一个请求,但需要正确管理状态 - Token 存储:刷新后的 Token 需要正确存储和更新
总结
使用 shareReplay 实现 Token 刷新的并发控制是一个优雅的解决方案,它通过共享 Observable 来确保:
- 避免重复请求:多个请求共享同一个 Token 刷新请求
- 资源节约:减少不必要的网络请求
- 代码简洁:不需要手动管理标志位和 Promise
- 自动管理:RxJS 自动处理订阅和取消
记住:当你需要多个订阅者共享同一个 Observable 结果时,使用 shareReplay 是最佳选择。