阅读视图

发现新文章,点击刷新页面。

线程池的类型和原理

参考文章: Java线程池的四种创建方式 - 绝不妥协绝不低头 - 博客园 (cnblogs.com) JAVA线程池原理详解一 - 冬瓜蔡 - 博客园 (cnblogs.com) 深度解读Java线

浅谈常用的设计模式

######参考文章: 单例模式 观察者模式 装饰模式 1.单例模式 (1):懒汉(线程不安全,在多线程中无法正常工作) (2):懒汉(线程··安全 synchronized同步静态方法,实际锁住了c

Dart - 全面认识Stream

第一章:Flutter 进阶——为什么你需要 Stream?从 Future 到流的思维跃迁 在 Flutter 和 Dart 的异步编程世界里,大多数开发者都是从 Future 开始入门的。我们习惯

20个例子掌握RxJS——第七章使用 shareReplay 实现 Token 刷新的并发控制

RxJS 实战:使用 shareReplay 实现 Token 刷新的并发控制

概述

在需要身份验证的 Web 应用中,Token 过期是一个常见问题。当多个请求同时发起,且 Token 都已过期时,如果每个请求都独立刷新 Token,会导致:

  1. 多个重复的 Token 刷新请求
  2. 资源浪费
  3. 可能的竞态条件

本章将介绍如何使用 RxJS 的 shareReplay 操作符来实现 Token 刷新的并发控制,确保多个请求共享同一个 Token 刷新请求。

问题场景

假设我们有 3 个 API 请求同时发起,且 Token 都已过期:

  1. 请求 A:检测到 Token 过期 → 发起 Token 刷新请求 1
  2. 请求 B:检测到 Token 过期 → 发起 Token 刷新请求 2
  3. 请求 C:检测到 Token 过期 → 发起 Token 刷新请求 3

问题:3 个请求都独立刷新 Token,导致重复请求和资源浪费。

期望:3 个请求共享同一个 Token 刷新请求,只刷新一次。

shareReplay 操作符简介

shareReplay 是 RxJS 中用于共享 Observable 结果的操作符。它会:

  1. 共享订阅:多个订阅者共享同一个源 Observable
  2. 缓存结果:缓存指定数量的最新值
  3. 避免重复执行:源 Observable 只执行一次

基本语法

source$.pipe(
  shareReplay(1) // 缓存最新的 1 个值
)

解决方案:使用 shareReplay 共享 Token 刷新

实现思路

  1. 创建一个 Token 刷新 Observable,使用 shareReplay 共享
  2. 当请求检测到 Token 过期时,订阅共享的 Token 刷新 Observable
  3. Token 刷新完成后,所有等待的请求都使用新的 Token 重试

核心代码

// 当前token(初始为过期token)
private currentToken = 'expired_token';

// token刷新Observable(使用shareReplay确保只有一个请求)
private tokenRefresh$: Observable<TokenResponse> | null = null;

// 刷新token(使用shareReplay确保并发请求时只有一个token刷新请求)
private refreshToken(): Observable<TokenResponse> {
  // 如果已经有正在进行的token刷新请求,直接返回该Observable
  if (this.tokenRefresh$) {
    return this.tokenRefresh$;
  }
  
  // 标记正在刷新token
  this.refreshingToken = true;
  this.cdr.detectChanges();
  
  // 创建新的token刷新请求,使用shareReplay确保多个订阅者共享同一个请求
  this.tokenRefresh$ = this.http.post<TokenResponse>(`${this.apiBaseUrl}/api/refresh-token`, {})
    .pipe(
      shareReplay(1), // 关键:使用shareReplay确保只有一个请求,所有订阅者共享结果
      catchError(error => {
        console.error('Token刷新失败:', error);
        this.tokenRefresh$ = null; // 重置,允许重试
        this.refreshingToken = false;
        this.cdr.detectChanges();
        return throwError(() => error);
      }),
      finalize(() => {
        // 请求完成后重置tokenRefresh$,允许下次刷新
        setTimeout(() => {
          this.tokenRefresh$ = null;
          this.refreshingToken = false;
          this.cdr.detectChanges();
        }, 100);
      })
    );
  
  return this.tokenRefresh$;
}

// 发起带token的请求(自动处理token刷新)
private makeRequestWithToken(apiName: string, apiUrl: string): Observable<ApiResponse> {
  // 先尝试使用当前token请求
  return this.http.get<ApiResponse>(`${this.apiBaseUrl}${apiUrl}`, {
    headers: new HttpHeaders({
      'Authorization': `Bearer ${this.currentToken}`
    })
  }).pipe(
    catchError((error) => {
      // 如果token过期(401错误)
      if (error.status === 401 && error.error?.code === 'TOKEN_EXPIRED') {
        console.log(`${apiName} token过期,等待token刷新...`);
        
        // 刷新token(如果已经有正在进行的刷新请求,会复用)
        return this.refreshToken().pipe(
          switchMap((tokenResponse) => {
            // token刷新成功,更新当前token
            this.currentToken = tokenResponse.data.token;
            this.currentTokenDisplay = tokenResponse.data.token;
            this.cdr.detectChanges();
            console.log(`${apiName} token刷新成功,使用新token重试请求`);
            
            // 使用新token重试请求
            return this.http.get<ApiResponse>(`${this.apiBaseUrl}${apiUrl}`, {
              headers: new HttpHeaders({
                'Authorization': `Bearer ${this.currentToken}`
              })
            });
          }),
          catchError((refreshError) => {
            console.error(`${apiName} token刷新失败:`, refreshError);
            return throwError(() => refreshError);
          })
        );
      }
      
      // 其他错误直接抛出
      return throwError(() => error);
    })
  );
}

关键点解析

1. shareReplay 的共享机制

当多个请求同时检测到 Token 过期时:

  1. 第一个请求:调用 refreshToken(),创建新的 Token 刷新 Observable,并订阅它(发起 HTTP 请求)
  2. 第二个请求:调用 refreshToken(),发现 tokenRefresh$ 已存在,直接返回共享的 Observable,并订阅它
  3. 第三个请求:调用 refreshToken(),发现 tokenRefresh$ 已存在,直接返回共享的 Observable,并订阅它

等待机制

  • 第二个、第三个请求通过订阅同一个 ObservabletokenRefresh$)来实现等待
  • 当它们调用 this.refreshToken().pipe(switchMap(...)) 时,switchMap等待上游 Observable(tokenRefresh$)发出值
  • 由于使用了 shareReplay,多个订阅者会共享同一个底层的 Observable 执行
  • 当第一个订阅者已经发起 HTTP 请求时,后续的订阅者会"加入"这个正在进行的执行
  • 当 HTTP 请求完成并发出 Token 值时,所有订阅者都会同时收到这个值
  • 然后 switchMap 才会执行下游的操作(使用新 token 重试请求)

结果:3 个请求共享同一个 Token 刷新请求,只刷新一次,并且都会等待 Token 刷新完成后才继续。

2. 缓存机制

shareReplay(1) 会缓存最新的 1 个值。这意味着:

  • 如果 Token 刷新已完成,后续订阅者会立即收到缓存的结果
  • 不需要重新发起请求

3. 错误处理

如果 Token 刷新失败:

  1. 重置 tokenRefresh$null,允许重试
  2. 抛出错误,让调用者处理

4. 清理机制

finalize 中重置 tokenRefresh$,确保下次 Token 过期时可以重新刷新。

执行流程示例

假设 3 个请求同时发起,且 Token 都已过期:

  1. 请求 A:检测到 Token 过期

    • 调用 refreshToken(),创建 Token 刷新 Observable(使用 shareReplay
    • 调用 this.refreshToken().pipe(switchMap(...))订阅 tokenRefresh$
    • 由于是第一个订阅者,开始执行底层 Observable,发起 HTTP 请求(Token 刷新)
  2. 请求 B:检测到 Token 过期(在请求 A 的 Token 刷新完成前,例如 1 秒后)

    • 调用 refreshToken(),发现 tokenRefresh$ 已存在,返回同一个 Observable
    • 调用 this.refreshToken().pipe(switchMap(...))订阅同一个 tokenRefresh$
    • 由于使用了 shareReplay加入正在进行的 HTTP 请求执行(不发起新请求)
    • switchMap 等待上游 Observable(tokenRefresh$)发出值
  3. 请求 C:检测到 Token 过期(在请求 A 的 Token 刷新完成前,例如 2 秒后)

    • 调用 refreshToken(),发现 tokenRefresh$ 已存在,返回同一个 Observable
    • 调用 this.refreshToken().pipe(switchMap(...))订阅同一个 tokenRefresh$
    • 由于使用了 shareReplay加入正在进行的 HTTP 请求执行(不发起新请求)
    • switchMap 等待上游 Observable(tokenRefresh$)发出值
  4. Token 刷新完成(例如 3 秒后)

    • HTTP 请求返回新的 Token
    • tokenRefresh$ Observable 发出值(新的 Token)
    • 所有订阅者(请求 A、B、C)同时收到新的 Token
    • 所有请求的 switchMap 同时执行,使用新 Token 重试各自的请求

关键点

  • 第二个、第三个请求通过订阅同一个 Observable 来实现等待
  • switchMap阻塞等待上游 Observable 发出值,然后才执行下游操作
  • shareReplay 确保多个订阅者共享同一个底层的 HTTP 请求执行

结果:只发起 1 次 Token 刷新请求,3 个请求共享结果,并且都会等待 Token 刷新完成后才继续 ✅

与其他方案的对比

方案 1:不使用 shareReplay(有问题)

// ❌ 错误示例:每个请求都独立刷新 Token
private refreshToken(): Observable<TokenResponse> {
  return this.http.post('/api/refresh-token', {}); // 可能发起多次请求
}

方案 2:使用标志位(复杂)

// ⚠️ 可行但复杂:需要手动管理标志位和 Promise
private isRefreshing = false;
private refreshPromise?: Promise<TokenResponse>;

private async refreshToken(): Promise<TokenResponse> {
  if (this.isRefreshing && this.refreshPromise) {
    return this.refreshPromise; // 等待正在进行的刷新
  }
  
  this.isRefreshing = true;
  this.refreshPromise = this.http.post('/api/refresh-token', {}).toPromise();
  // ...
}

方案 3:使用 shareReplay(推荐)✅

// ✅ 推荐:简洁、自动管理
private tokenRefresh$ = this.http.post('/api/refresh-token', {}).pipe(
  shareReplay(1)
);

实际应用场景

1. HTTP 拦截器中的 Token 刷新

// HTTP 拦截器
intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
  return next.handle(req).pipe(
    catchError(error => {
      if (error.status === 401) {
        return this.refreshToken().pipe(
          switchMap(token => {
            // 使用新 Token 重试请求
            const cloned = req.clone({
              setHeaders: { Authorization: `Bearer ${token}` }
            });
            return next.handle(cloned);
          })
        );
      }
      return throwError(() => error);
    })
  );
}

2. 多个 API 请求的 Token 刷新

// 同时发起多个请求
forkJoin({
  user: this.getUser(),
  orders: this.getOrders(),
  products: this.getProducts()
}).subscribe(results => {
  // 所有请求共享同一个 Token 刷新请求
});

性能优化建议

1. 添加 Token 过期时间检查

在刷新 Token 前,检查 Token 是否真的过期:

private isTokenExpired(): boolean {
  // 检查 Token 是否过期
  return this.tokenExpiryTime < Date.now();
}

private refreshToken(): Observable<TokenResponse> {
  if (!this.isTokenExpired()) {
    return of({ token: this.currentToken }); // Token 未过期,直接返回
  }
  // ...
}

2. 添加重试机制

对于 Token 刷新失败的情况,可以添加重试:

this.http.post('/api/refresh-token', {}).pipe(
  retry(2), // 失败后重试 2 次
  shareReplay(1)
)

3. 添加超时处理

为 Token 刷新添加超时处理:

this.http.post('/api/refresh-token', {}).pipe(
  timeout(5000), // 5 秒超时
  shareReplay(1)
)

注意事项

  1. 内存泄漏:确保在组件销毁时取消订阅
  2. 错误处理:确保 Token 刷新失败时有适当的错误处理
  3. 并发控制shareReplay 确保只有一个请求,但需要正确管理状态
  4. Token 存储:刷新后的 Token 需要正确存储和更新

总结

使用 shareReplay 实现 Token 刷新的并发控制是一个优雅的解决方案,它通过共享 Observable 来确保:

  • 避免重复请求:多个请求共享同一个 Token 刷新请求
  • 资源节约:减少不必要的网络请求
  • 代码简洁:不需要手动管理标志位和 Promise
  • 自动管理:RxJS 自动处理订阅和取消

记住:当你需要多个订阅者共享同一个 Observable 结果时,使用 shareReplay 是最佳选择。

码云地址:gitee.com/leeyamaster…

❌