普通视图

发现新文章,点击刷新页面。
昨天 — 2026年2月10日首页

全栈进阶-redis最佳入门实战项目篇

2026年2月10日 19:23

一、环境准备

后端采用Python + FastApiRedisMySQL都在云服务器上,通过docker安装的,向外暴露接口实现的远程连接。

服务部署

前面介绍过fastapi的基本语法,但是缺了部署这个环节,在这里补充下。

我的安装环境都是在阿里云的云服务器上,采用的是pm2来管理fastapi项目,首先在centos上安装相关依赖和环境:

# 1. 更新系统
sudo yum update -y

# 2. 安装 EPEL 源(Python 虚拟环境需要)
sudo yum install -y epel-release

# 3. 安装 Python3 和 pip
sudo yum install -y python3 python3-pip python3-venv python3-wheel

# 确认 Python 安装
python3 -V
pip3 -V

# 4. 安装 Node.js (使用官方源安装最新 LTS)
curl -fsSL https://rpm.nodesource.com/setup_lts.x | sudo bash -
sudo yum install -y nodejs

# 确认 Node 和 npm 安装
node -v
npm -v

# 5. 安装 PM2 全局
sudo npm install -g pm2
pm2 -v

准备测试脚本:

首先准备下虚拟环境:

python3 -m venv venv

venv\Scripts\activate

新建main.py

from datetime import datetime

from fastapi import FastAPI, HTTPException, Query
import pymysql


app = FastAPI(title="Simple MySQL Query Service")


def get_connection() -> pymysql.connections.Connection:
    """创建并返回一个 MySQL 连接。"""

    return pymysql.connect(
        host="118.31.xxx",
        port=3307,
        user="root",
        password="xxx",
        database="blog",
        cursorclass=pymysql.cursors.DictCursor,
        charset="utf8mb4",
    )


@app.get("/hello")
async def hello():
    """返回 你好 + 当前时间(年月日 时分秒)。"""

    now = datetime.now()
    current_time = now.strftime("%Y年%m月%d日 %H:%M:%S")
    return {"message": f"你好,当前时间是:{current_time}"}


@app.get("/student_score")
async def get_student_score(number: str = Query(..., description="学生编号")):
    """根据 number 查询 blog 库中 student_score 表的 subject 和 score。"""

    try:
        conn = get_connection()
        with conn:
            with conn.cursor() as cursor:
                sql = (
                    "SELECT subject, score "
                    "FROM student_score "
                    "WHERE number = %s"
                )
                cursor.execute(sql, (number,))
                rows = cursor.fetchall()
    except pymysql.MySQLError as exc:  # 数据库连接或执行出错
        detail = (
            exc.args[1]
            if len(exc.args) > 1
            else str(exc)
        )
        raise HTTPException(status_code=500, detail=f"数据库错误: {detail}") from exc

    if not rows:
        raise HTTPException(status_code=404, detail="未找到该 number 对应的成绩")

    return {"number": number, "results": rows}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

本地运行下python mian.py,然后测试下我们的接口,看来是没问题的。就着手开始部署到线上。

新建Gunicorn配置文件gunicorn_config.py

bind = "0.0.0.0:8011"          # 对外端口
workers = 2                    # worker 数量,看 CPU 调整
worker_class = "uvicorn.workers.UvicornWorker"
timeout = 60
accesslog = "logs/access.log"
errorlog = "logs/error.log"
loglevel = "info"

开始新建项目依赖requirements.txt

fastapi
uvicorn[standard]
pymysql

同时我的nginx的代理转发的配置文件,是这样的

location /fastapi/query/ {
        proxy_pass http://127.0.0.1:8011/;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_redirect off;
    }

接下来将项目文件压缩成zip包,拖到服务器的指定目录,运行unzip query.zip解压即可。

然后开始安装依赖:

首先开启虚拟环境:
python3 -m venv venv
source venv/bin/activate

然后安装依赖
pip install -r requirements.txt

这一步可能安装巨慢,建议切换阿里云的下载链接

这里巨坑,折腾了我一个多小时,阿里云的python版本默认是3.6.8,使用Gunicorn需要3.10以上的,一开始安装依赖一直卡住了,也不报错,换了好几个源,还是不行,我以为我服务器带宽小导致的,换成本地安装,然后拖到服务器上,这才发现是版本的原因,然后安装3.11.9后删除虚拟环境,重新安装才成功。

安装成功后,就可以啦。

测试下:运行gunicorn -c gunicorn_config.py main:app,在开启一个服务器连接,

输入:
curl "http://127.0.0.1:8011/hello"
{"message":"你好,当前时间是:2026年02月08日 15:38:53"}[root@iZbp1f9urggte5qz3ri1riZ ~]# 
curl "http://127.0.0.1:8011/student_score?number=20180101"
{"number":"20180101","results":[{"subject":"母猪的产后护理","score":78},{"subject":"论萨达姆的战争准备","score":88}]}[root@iZbp1f9urg

可以看到我们的服务器已经正常运行了,二级域名已经解析了,nginx代理也已经配置了,

输入curl http://api.jinxudong.com/fastapi/query/hello可以看到正常的接口输出的,说明我们的脚本已经部署成功了,但是这种部署方式还是比较原始的,而且后续服务肯定不止这个一个,我打算采用pm2来管理这些服务。

在项目的根目录写上配置文件ecosystem.config.js

module.exports = {
  apps: [
    {
      name: "query",
      script: "bash",
      args: "-c '/var/www/python/query/venv/bin/gunicorn -c gunicorn_config.py main:app'",
      cwd: "/var/www/python/query"
    }
  ]
};

运行脚本pm2 start ecosystem.config.js就可以看到我们的服务了,在设置下开机自启:

pm2 save
pm2 startup

我们的服务已经正常部署到线上啦,可以通过域名去访问

https://api.jinxudong.com/fastapi/query/student_score?number=20180101

{"number":"20180101","results":[{"subject":"母猪的产后护理","score":78},{"subject":"论萨达姆的战争准备","score":88}]}
压测工具介绍

一个系统上线后,是必须要经过压测的,就是模拟大量用户同时访问服务,找到系统的极限QPS,极限并发,找出性能瓶颈,避免线上事故。

接下来介绍两个常见的压测工具:

  1. ab

    这是Apache自带的压测工具,非常的简轻量,很适合小接口测试,做下简单的性能验证。

    首先安装下这个工具

    yum install httpd-tools
    

    使用也很简单

    ab -n 1000 -c 50 https://api.jinxudong.com/fastapi/query/student_score?number=20180101
    

    用50的并发完成1000次请求,看下ab的输出日志

    Server Software:        nginx/1.20.1
    Server Hostname:        api.jinxudong.com
    Server Port:            443
    SSL/TLS Protocol:       TLSv1.2,ECDHE-RSA-AES256-GCM-SHA384,2048,256
    Server Temp Key:        X25519 253 bits
    TLS Server Name:        api.jinxudong.com
    
    Document Path:          /fastapi/query/student_score?number=20180101
    Document Length:        133 bytes
    
    Concurrency Level:      50
    Time taken for tests:   17.659 seconds
    Complete requests:      1000
    Failed requests:        0
    Total transferred:      283000 bytes
    HTML transferred:       133000 bytes
    Requests per second:    56.63 [#/sec] (mean)
    Time per request:       882.956 [ms] (mean)
    Time per request:       17.659 [ms] (mean, across all concurrent requests)
    Transfer rate:          15.65 [Kbytes/sec] received
    
    Connection Times (ms)
                  min  mean[+/-sd] median   max
    Connect:       11   56 119.1     13    1471
    Processing:    32  810 212.6    807    1854
    Waiting:       30  810 212.6    806    1854
    Total:         48  866 258.9    828    2290
    
    Percentage of the requests served within a certain time (ms)
      50%    828
      66%    911
      75%    998
      80%   1041
      90%   1195
      95%   1295
      98%   1559
      99%   1798
     100%   2290 (longest request)
    

    看下几个核心指标

    1. Request per second,也就是常说的QPS,也就是每秒可以处理56个请求

    2. Time per request,平均响应时间

      Time per request:       882.956 [ms] (mean)
      Time per request:       17.659 [ms] (mean, across all concurrent requests)
      

      第一个时间单个请求平均耗时,这是用户真实的体验时间,第二个时间是平均耗时/并发数

    3. 延迟分布

      Percentage of the requests served within a certain time (ms)
        50%    828
        66%    911
        75%    998
        80%   1041
        90%   1195
        95%   1295
        98%   1559
        99%   1798
       100%   2290 (longest request)
      

      第一个数字是百分比,第二个是耗时,换成表格是这样

      百分位 含义
      P50 50%请求 < 828ms
      P90 90%请求 < 1195ms
      P95 95%请求 < 1295ms
      P99 99%请求 < 1798ms
      max 最慢请求 2290ms
    4. 连接时间分析

      Connection Times (ms)
                    min  mean[+/-sd] median   max
      Connect:       11   56 119.1     13    1471
      Processing:    32  810 212.6    807    1854
      Waiting:       30  810 212.6    806    1854
      Total:         48  866 258.9    828    2290
      

      这里的Connect就是TCP握手简历链接的时间,Processing就是后端处理的时间

  2. wrk

wrk目前是后端常用的压测工具,支持多线程和Lua脚本,可以模拟真实并发。

首先在服务器上安装下

#先安装编译工具
sudo yum groupinstall "Development Tools" -y
sudo yum install git -y
#下载
curl -L -o wrk.zip https://codeload.github.com/wg/wrk/zip/refs/heads/master
unzip wrk.zip
cd wrk-master

#编译
make

#复制到全局路径
sudo cp wrk /usr/local/bin/

安装成功以后,直接测试:

wrk -t4 -c100 -d30s --latency https://api.jinxudong.com/fastapi/query/student_score?number=20180101

意思就是四个线程,100的并发,测试30秒

  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.18s    66.72ms   1.42s    96.70%
    Req/Sec    36.35     26.08   100.00     54.00%
  Latency Distribution
     50%    1.18s 
     75%    1.19s 
     90%    1.20s 
     99%    1.23s 
  2483 requests in 30.07s, 698.34KB read
Requests/sec:     82.58
Transfer/sec:     23.22KB

可以看到,Requests/sec就是吞吐量82,每秒可以处理82个请求,平均延迟1.23秒,

压测还是比较简单的,就是利用工具查看qps和延迟时间,当qps不涨,延迟时间飞涨甚至于报错,那说明就到达极限了。

下面就开始实战环节了。

二、信息查询系统

这是一个查询商品信息的系统,就是查询商品详情,为了增加系统的吞吐量,常见的做法就是增加一个缓存层,流程就是:请求接口来了,先去查询redis缓存层,缓存命中就直接返回;如果未命中再去查库。这也是前面介绍的旁路缓存,用内存换取数据库压力。

系统架构图如下

客户端 (浏览器/APP)
        |
        v
    FastAPI 接口层
        |
    -----------------
    |               |
Redis 缓存         MySQL 数据库
环境准备

数据库设计

CREATE TABLE products (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    price DECIMAL(10,2) NOT NULL,
    stock INT NOT NULL,
    description TEXT,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

id 是主键

name, price, stock, description 是商品核心信息

updated_at 商品的更新时间,可以用于后面缓存过期策略(缓存失效时刷新)

然后插入一些测试数据

INSERT INTO products (name, price, stock, description)
SELECT
    CONCAT('商品-', t.num),
    ROUND(RAND() * 500 + 10, 2),
    FLOOR(RAND() * 100),
    CONCAT('这是商品 ', t.num, ' 的描述')
FROM (
    SELECT a.n + b.n * 10 + c.n * 100 + 1 AS num
    FROM
        (SELECT 0 n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
         UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) a,
        (SELECT 0 n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
         UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) b,
        (SELECT 0 n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4
         UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) c
) t
WHERE t.num <= 1000;

准备工作已经就绪,接下来开始编写代码啦

系统编写

代码也比较简单,就是一个查询接口,和前面是实例基本一致

@app.get("/get_product")
async def get_product(id: int = Query(..., description="商品编号")):
    """根据商品 id 查询商品信息。"""
    try:
        conn = get_connection()
        with conn:
            with conn.cursor() as cursor:
                sql = (
                    "SELECT * "
                    "FROM products "
                    "WHERE id = %s"
                )
                cursor.execute(sql, (id,))
                rows = cursor.fetchall()
    except pymysql.MySQLError as exc:  # 数据库连接或执行出错
        detail = (
            exc.args[1]
            if len(exc.args) > 1
            else str(exc)
        )
        raise HTTPException(status_code=500, detail=f"数据库错误: {detail}") from exc

    if not rows:
        raise HTTPException(status_code=404, detail="未找到该 id 对应的商品信息")

    return {"id": id, "results": rows}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

然后运行python main.pyhttp://localhost:8000/get_product?id=22是可以看到数据返回的。

说明我们的代码是没问题的,接下来就开始部署,然后再服务器上做压测,看下当前的服务器的最大QPS是多大。

部署方案和前面介绍的一致,记得更改nginx配置和pm2的配置文件,通过公网访问下我们的接口

https://api.jinxudong.com/fastapi/queryDetail/get_product?id=122
可以看到如下输出:
{
  "id": 122,
  "results": [
    {
      "id": 122,
      "name": "商品-172",
      "price": 461.79,
      "stock": 72,
      "description": "这是商品 172 的描述",
      "updated_at": "2026-02-10T04:13:17"
    }
  ]
}

说明接口已经部署成功了,下面就开始压测,要知道这个接口的最大QPS,接下来使用ab来开始压测

第一次压测

首先使用20的并发,看下压测报告:

ab -n 1000 -c 20 https://api.jinxudong.com/fastapi/queryDetail/get_product?id=122

压测报告截取:
Requests per second:    54.99 [#/sec] (mean)  
Time per request:       363.704 [ms] (mean)  
50%    307ms
75%    417ms
90%    512ms
99%   1282ms

可以看到QPS54,平均时间363ms,延迟分布中50%的请求小于307ms

第二次压测

使用40的并发,看下压测报告:

ab -n 1000 -c 40 https://api.jinxudong.com/fastapi/queryDetail/get_product?id=122
 
压测报告截取:
Requests per second:    53.87 [#/sec] (mean)
Time per request:       742.486 [ms] (mean)
  50%    712
  66%    769
  75%    826
  80%    850
  90%    926

QPS53,延迟时间和平均请求时间都在增大,有一种步子迈大了的感觉,减少并发试试

第三次压测

使用10的并发,看下压测报告

 ab -n 1000 -c 10 https://api.jinxudong.com/fastapi/queryDetail/get_product?id=122
 
 压测报告截取:
 Requests per second:    55.53 [#/sec] (mean)
Time per request:       180.081 [ms] (mean)
  50%    135
  66%    154
  75%    173
  80%    181
  90%    332

QPS55,平均时间和延迟时间都降低了,但是QPS确实没怎么变化,可以断定当前系统的最大吞吐量就是55,下面增加一个缓存层,然后看下系统的吞吐量是否发生变化

存增加缓存层

首先回顾下旁路缓存,当查询时首先去缓存中查找,如果缓存命中就直接返回;如果未命中就去查询数据库,然后将值写到缓存中,如果数据库也没有,就写一个空值到缓存中,防止穿透。这就是下面要写的代码的逻辑。

直接看下接口代码

@app.get("/get_product")
async def get_product(id: int = Query(..., description="商品编号")):
    """根据商品 id 查询商品信息。"""

    cache_key = _make_product_cache_key(id)

    # 1. 先查 Redis 缓存
    try:
        cached = redis_client.get(cache_key)
        if cached is not None:
            rows = json.loads(cached)
            # 缓存命中空列表,说明数据库也没有,直接返回 404,防止缓存穿透
            if not rows:
                raise HTTPException(status_code=404, detail="未找到该 id 对应的商品信息")
            return {"id": id, "results": rows}
    except redis.RedisError:
        # 缓存异常时,降级为直接查数据库
        pass

    try:
        conn = get_connection()
        with conn:
            with conn.cursor() as cursor:
                sql = (
                    "SELECT * "
                    "FROM products "
                    "WHERE id = %s"
                )
                cursor.execute(sql, (id,))
                rows = cursor.fetchall()
    except pymysql.MySQLError as exc:  # 数据库连接或执行出错
        detail = (
            exc.args[1]
            if len(exc.args) > 1
            else str(exc)
        )
        raise HTTPException(status_code=500, detail=f"数据库错误: {detail}") from exc

    if not rows:
        # 数据库未命中,也写入空列表到缓存,防止穿透
        try:
            redis_client.setex(cache_key, 60, json.dumps([]))
        except redis.RedisError:
            pass
        raise HTTPException(status_code=404, detail="未找到该 id 对应的商品信息")

    # 查询到数据后写入缓存
    try:
        encoded_rows = jsonable_encoder(rows)
        redis_client.setex(cache_key, 300, json.dumps(encoded_rows, ensure_ascii=False))
    except (redis.RedisError, TypeError, ValueError):
        # 缓存写入失败不影响接口主流程
        pass

    return {"id": id, "results": rows}

首先在接口请求时,一开始就构建了缓存的key,_make_product_cache_key的方法是这样的

def _make_product_cache_key(product_id: int) -> str:
    """生成商品缓存 key。"""

    return f"product:{product_id}"

构建的key就是product:{product_id},然后就先查询缓存,如果缓存命中就直接返回,未命中就查库;命中时也有个非空判断,如果是写入的空数组,表明是数据库也没有的,直接返回404,防止穿透。查询数据库就是和前面一样的逻辑了,加了个写入缓存的逻辑:redis_client.setex(cache_key, 300, json.dumps(encoded_rows, ensure_ascii=False))。当数据库也没有时,就写个空,防止穿透:redis_client.setex(cache_key, 60, json.dumps([])),然后返回错误。

部署到线上后,https://api.jinxudong.com/fastapi/queryDetail/get_product?id=5接口有正常的返回,看下redis数据库,也有相关的product:5作为key,表明我们的更改是成功的。

要想证明我们的更改效果,就需要做压测了,

第一次压测

直接使用40的并发

ab -n 1000 -c 40 https://api.jinxudong.com/fastapi/queryDetail/get_product?id=122

Requests per second: 73.80 [#/sec] (mean) 
Time per request: 542.007 [ms] (mean)

我以为这个QPS会变得非常大,才是73,而没加redis是53,我查看了redis数据库,缓存是存在的,虽然说QPS增加了,但是并不是很明显,看来影响系统的吞吐量的并不只是查询数据库,还有TCP连接,因为这个接口多了一层nginx转发,

第二次压测

这次不走nginx转发了,直接走系统的服务

ab -n 1000 -c 40 http://127.0.0.1:8012/get_product?id=122

Requests per second:    365.20 [#/sec] (mean)
Time per request:       109.528 [ms] (mean)

QPS一下子就起来了,而且平均响应时间也降低了。

两次测试结果有点出乎意料,都说redis毫秒级别的响应,但是QPS并没有指数级别的上升,看来对于小型应用,提升QPS来说,redis的作用其实并没有那么大,我的服务器宽带只有2M,费这么大劲,还不如去增加点带宽效果来的明显。

昨天以前首页

全栈进阶-redis入门实战概念篇

2026年2月5日 20:00

第一阶段:redis基础

1. 简介

Redis是一款开源的、基于内存的键值对数据库,支持将内存持久化到磁盘,还提供了丰富的数据结构、事务、发布订阅等功能,被广泛的用于缓存、消息队列、会话存储等场景。

作为一个前端开发,对于Redis第一影响就是读写操作非常的快,常用于一些需要快速读写数据的场景,比如存储会话sessionRedis之所以这么快,在于Redis利用了内存操作、IO多路复用、避免线程切换开销三大核心优势,让单线程也足以支撑超高并发。

Redis并非纯单线程,只是在接受客户请求的核心处理流程是单线程的,在处理慢操作,比如持久化读写磁盘、异步删除大键、主从复制的网络同步,会启动多个辅助线程。为啥当初Redis不是设计成多线程呢,主要是单线程设计简单,核心逻辑都是串行执行的,后续的维护成本极低,同时也避免了多线程的死锁啊,数据一致性啊这些麻烦的问题;内存的操作足够快,多线程必然会涉及到线程切换和锁竞争,这些都会降低效率;IO的多路复用,Redis运行在网络层,使用的基于Unix系统的IO多路复用机制,就是主线程通过事件循环来监听所有客户端的IO操作,维护一个事件队列去处理IO操作,这种单线程非阻塞的IO多路复用让Redis可以同时管理上万的TCP连接。

2.redis数据基本结构

Redis基本数据结构主要五个,接下来挨个介绍下

首先安装下环境:

pip install redis

先创建一个虚拟环境,然后安装相应的依赖

import redis

# 连接到你的线上 Redis
r = redis.Redis.from_url(
    "redis://:xxx",
    decode_responses=True,  # 返回 str 而不是 bytes
)

# 设置一个 study:string 的 key
r.set("study:string", "hello redis from python")

# 读出来验证一下
value = r.get("study:string")
print("study:string =", value)
2.1 String

StringRedis最基础、最常用的数据结构,所有的键值对的value本质上都可以使用String来存储,单key最大容量512M,所有的操作都是原子性的,支持位运算和过期策略。

比如前面的案例r.set("study:string", "hello redis from python"),就是设置一个字符串

如果要设置一个过期时间的话,也比较简单

r.set("study:string", "hello redis from python", ex=60),这里的ex就是秒数,如果是px就是毫秒数,这里设置的时间就表明key的过期时间,如果过期了key就会被删除。

2.1 Hash

Hash是一个键对应多个键值对的结构,类似于Map和字典,一般用来存储结构化的对象。

r.hset("study:hash", mapping={
    "name": "张三",
    "age": "20",
})

data = r.hgetall("study:hash")
print(data)  # {'name': '张三', 'age': '20'}

如果要删除hash中的指定字段的话,可以使用这个方法hdel

r.hdel("study:hash", "age")
2.3 List

这里的List是按照插入顺序排序的字符串集合,支持两端搞笑增删,中间查询稍慢,是一个双向链表。

# 从右侧依次塞入几个元素
r.rpush("study:list", "apple", "banana", "orange")

# 从左侧再塞一个
r.lpush("study:list", "pear")   # list 现在是: ["pear", "apple", "banana", "orange"]

# 读取整个 list(0 到 -1 表示所有元素)
items = r.lrange("study:list", 0, -1)
print("study:list =", items)

# 弹出一个元素(比如从左边弹)
left = r.lpop("study:list")
print("lpop 之后取出的 =", left)
print("剩下的 =", r.lrange("study:list", 0, -1))

可以用来存储一些任务队列和消息队列

2.4 Set

Set 是无序、元素唯一的字符串集合,支持集合间的交、并、差运算,适合处理 “去重” 和 “关系匹配” 场景。

# 往 set 里加元素(去重)
r.sadd("study:set", "apple", "banana", "orange")
r.sadd("study:set", "banana")  # 再加一次不会重复

# 查看所有成员
members = r.smembers("study:set")
print("study:set =", members)

# 判断某个值是否在 set 中
print("是否包含 apple:", r.sismember("study:set", "apple"))

# 删除一个成员
r.srem("study:set", "banana")
print("删除 banana 后 =", r.smembers("study:set"))

# 给整个 set 设置过期时间 60 秒
r.expire("study:set", 60)
2.5 ZSet

ZSet 是 Set 的升级版,每个元素关联一个 “分数(score)”,Redis 会按分数从小到大排序,兼具 唯一性 和 有序性。

# 往 zset 里加数据:成员 + 分数
r.zadd("study:zset", {
    "Alice": 100,
    "Bob": 80,
    "Charlie": 95,
})

# 按分数从小到大取出所有成员
print("从小到大:", r.zrange("study:zset", 0, -1, withscores=True))

# 按分数从大到小取出前 2 名
print("从大到小前2名:", r.zrevrange("study:zset", 0, 1, withscores=True))

# 给某个人加分(比如 Alice +10)
r.zincrby("study:zset", 10, "Alice")
print("Alice 加分后:", r.zrevrange("study:zset", 0, -1, withscores=True))

# 删除一个成员
r.zrem("study:zset", "Bob")
print("删除 Bob 后:", r.zrange("study:zset", 0, -1, withscores=True))

有一个常见的面试题,HashString都可以用来存储对象,一般用那个来存储对象呢,使用String来存储对象,简单直观,但是它不支持局部更新,改一个字段需要覆盖这个字符串,适合一些整体读写、字段少的场景;Hash存储对象,他就支持局部更新,适合一些复杂对象的存储,比如高频更新字段。

3. redis基本命令

因为Redis都是键值对的存储,所以他的方法也很简单,看下面这个例子:

# 1. SET:设置一个字符串 key
r.set("study:string", "hello")

# 2. GET:读取这个 key
print("GET study:string =", r.get("study:string"))  # hello

# 3. INCR:自增一个数值型 key
# 如果这个 key 不存在,会从 0 开始加 1,变成 "1"
r.delete("study:count")  # 为了方便测试,先删掉
r.incr("study:count")    # 当前值 1
r.incr("study:count")    # 当前值 2
r.incr("study:count", 5) # 加 5 -> 当前值 7
print("study:count =", r.get("study:count"))  # 7

# 4. EXPIRE:给 key 设置过期时间(单位:秒)
r.expire("study:count", 5)  # 5 秒后过期

读、写、自增和设置过期时间,都比较简单。

因为Redis都是键值对,没有表的概念,所以Key管理就成了问题,社区有一个约定的规范:业务标识:模块名称:唯一标识[:子字段],比如ecom:user:1001:name,这就是电商业务:用户模块:用户id:用户名。还有一些额外的补充规范:

  1. 统一小写:避免大小写混乱,User:1user:1是两个key
  2. 简洁且语义化:看到名称基本就能了解存储的内容
  3. 避免特殊字符:比如空格、换行符、下划线

第二阶段:redis核心机制

4.redis内存模型

Redis是一个内存数据库,大多数数据都保存才内存中。它的内存可以分为两大部分,核心内存和辅助内存。其中核心内存存储的就是我们常用的键值对,也就是key内存和value内存,存储的都是我们所用到的数据;辅助内存放的都是非业务数据,就是Redis运行所需的额外内存,比如一些过期字典、进程本身的开销等。

Redis有一套完善的内存管理机制,主要有这么几步

  1. 基于jemalloc内存分配,将内存划分为不同大小的内存页,比如8B,16B,32B等,分配时匹配最接近的页,减少碎片;线程缓存,减少锁竞争,提升分配效率
  2. 内存回收:内存回收主要有两种,惰性删除和定期删除。惰性删除指的是访问key时检查是否过期,过期了就删除;定期删除,每100ms随机抽查过期的key,去删除已经过期的,但是这里有个问题,如果key过期了,但是没有被抽查到呢,为啥不扫描全量的key呢,这就是一个平衡了,全量扫描需要占用大量的CPU,会影响到业务的,这个就叫做延迟回收,也就是说可能一时半会回收不了,但是终归会被回收。

Redis的内存处理机制天然就有一种滞后性,可能就会出现内存满了的情况,这里的内存满了,并不是设置某个key的value大小超过512M,而是Redis进程占用的内存满了,这里的满有两个意思:主动设置的maxmemory,这个在生产环境上是必须要设置的

maxmemory 4GB  # 限制 Redis 最大使用内存为 4GB

还有一种满就是,如果不设置这个最大值,Redis就会无限制的占用服务器的物理内存,直到耗尽服务器所有可用的物理内存,这个时候操作系统会将Redis的内存数据交换到磁盘的swap分区,这个是磁盘模拟的内存,速度巨慢,最终导致Redis性能暴跌,也可能因为服务器内存耗尽而被系统杀死。

当内存满了后,Redis也有一套内存淘汰策略来处理这种情况,当Redis占用的内存超过设置的maxmemory后,然后再去执行写操作,就会去触发我们的内存淘汰策略,主要有这么几种策略:

  1. LRU

    最近最少使用,就是淘汰那些最近访问次数最少的key,标准的LRU需要维护一个访问时间链表,内存和cpu开销大,Redis实现的是近似LRU:维护一个候选池,触发淘汰时,从目标范围随机抽取key,也就是触发淘汰时,随机抽取一批key,然后比一比谁的访问时间最远,然后就淘汰它。

  2. LFU

    优先淘汰访问频率最低的key,Redis实现的LFU并不是简单的访问次数统计,而是通过概率递增的访问计数+时间衰减机制来近似的反应key的长期访问价值;在触发淘汰时,在通过随机采样的方式选择访问评率最低的key去淘汰。

当内存满了后,再去对数据库做读写操作,读的操作没有影响,但是在触发写的操作时,如果内存满了,会先根据maxmemory-policy设置的内存淘汰策略,在写操作触发的同时根据LFU、LRU去更新内存,直到内存会到安全区;如果内存淘汰策略味为noeviction或者无法淘汰,直接回报错。

5.持久化机制

前面也提到了,Redis是一种基于内存的键值数据库,内存的特性就是在服务器重启后会全部丢失,这就需要将数据做持久化,即使服务器重启了,也可以从磁盘中恢复数据至内存。

Redis提供了两种核心的持久化方式:RDB和AOF,快照持久化和追加文件持久化,接下来挨个介绍下:

快照持久化

RDB是定时对Redis内存中的全量数据做一次拍照,生成一个压缩的二进制文件,比如dump.rdb,保存到磁盘的指定目录。Redis重启时直接加载这个二进制文件,将数据恢复到内存中。

RDB有手动触发和自动触发两种方式:手动触发可以使用save来同步触发,同步触发会阻塞主进程,直到RDB文件生成完成,异步触发通过bgsave来触发,Redis会fork一个子进程来执行RDB文件的生成,主进程会继续处理客户端的请求;自动触发是在配置文件redis.conf中通过快照规则来配置的,满足条件就会自动执行bgsave

save 900 1      # 900 秒内至少 1 次写
save 300 10     # 300 秒内至少 10 次写
save 60 10000   # 60 秒内至少 10000 次写

这里就是自动触发RDB的规则:满足其中的任意条件就会触发一次,比如60s内写一次、300s内写10次等。

RDB优点就在于性能开销小,生成RDB由子进程负责,主进程仅做fork操作,几乎不影响业务;二进制文件直接加载到内存速度也很快。但是缺点也很明显,RDB是定时快照,如果Redis意外崩溃,比如服务器断电,就会丢掉最后一次快照前到崩溃前的所有数据。

追加日志持久化

AOF就是为了解决RDB数据库丢失而设计的持久化方式,就是将Redis的操作日志按照顺序记录下来,重启后通过重放AOF文件中所有的写命令去恢复内存数据。默认是关闭的,需要appendonly yes命令来手动重启。

AOF的相关配置在redis.conf文件中

appendonly yes # 开启AOF(默认no,关闭)
appendfilename "appendonly.aof" # AOF文件名,默认保存在Redis工作目录
dir ./ # 持久化文件(RDB/AOF)的保存目录,默认是Redis启动目录

AOF主要分为三个步骤:

  1. 命令追加

    Redis执行完一个写命令后,会将该命令按照协议追加到内存中的AOF缓冲区,避免直接写入磁盘,减少IO开销

  2. 文件写入

    Redis会定期将AOF缓冲区的数据写入到内核页缓存,这个操作是调用操作系统的write方法,属于异步操作,不会阻塞主线程。

  3. 文件同步

    将内核页缓存中的AOF数据写到测盘中,这个是调用的操作系统的同步方法,会阻塞主线程的,直到刷盘完成。

    将AOF缓冲区中的命令刷到磁盘的AOF文件中,有三种策略:

    # appendfsync 有三个取值:
    appendfsync always  # 每次写命令都立即刷盘(同步),数据最安全,性能最差
    appendfsync everysec# 每秒刷盘一次(默认值),平衡数据安全和性能
    appendfsync no      # 由操作系统决定何时刷盘,性能最好,数据丢失风险最高
    

由于AOF是日志追加的形式,会产生大量的中间态,比如set key 1set key 2set key 3 ,这种中间态其实是没有意义的,还会导致AOF文件变得很大,这就需要AOF重写机制了,重写就是遍历内存中的所有的数据,根据当前的键值对生成一套最简的写命令集来替换原有的AOF文件,重写的触发也分为手动触发和自动触发:手动触发需要执行bgrewriteaof命令;自动触发是通过配置文件,当文件的体积增长到达阙值时,自动触发`bgrewriteaof

auto-aof-rewrite-min-size 64mb  # AOF文件的最小体积,低于这个值不触发重写(默认64mb)
auto-aof-rewrite-percentage 100 # 重写触发的百分比,指当前AOF文件体积比上一次重写后的体积增长了多少(默认100%)

AOF的优点就是,可以通过刷盘策略来控制数据丢失的风险,默认的everysec仅丢失1s的数据,alway几乎无丢失;缺点就是AOF文件体积较大,恢复数据时加载较慢。

混合持久化

RDB和AOF单独使用都各有优缺点,在Redis 4.0之后,引入了混合持久化机制,融合恶RDB和AOF的优点,成为了目前生产环境的首选方案。

redis.conf配置文件中开启混合持久化:

aof-use-rdb-preamble yes  # 开启混合持久化(Redis 4.0+,默认no;Redis 6.0+ 部分版本默认yes)

开启后,AOF文件就不再是纯文本了,头部就成了RDB格式的全量数据快照,也就是二进制文件,尾部是AOF格式写的增量命令,记录从生成RDB快照到当前的所有写命令,是纯文本。

其工作流程主要有这么几个步骤:

当AOF触发重写时,

  1. redis主进程进入fork子进程,执行AOF重写
  2. 子进程首先将内存中的全量数据以RDB格式写入到临时的AOF文件头部
  3. 子进程完成RDB写入后,主进程将AOF重写缓冲区中所有的增量写命令,以AOF格式写入到临时的AOF文件尾部
  4. 主进程用临时AOF文件替换掉旧的AOF文件,完成混合持久化的重写。

混合持久化的优点就在于加载速度快,数据丢失风险小,而且文件的体积也不会很大。

下面推荐一个常见的生产环境的配置,开启混合持久化+RBD默认自动快照:

# ===================== RDB 核心配置 =====================
save 900 1
save 300 10
save 60 10000
rdbcompression yes  # 开启RDB压缩
dbfilename dump.rdb # RDB文件名
dir ./              # 持久化文件存储目录(建议修改为独立的磁盘目录)

# ===================== AOF 核心配置 =====================
appendonly yes      # 开启AOF(混合持久化的前提)
appendfilename "appendonly.aof" # AOF文件名
appendfsync everysec # 刷盘策略,生产首选
auto-aof-rewrite-min-size 64mb # AOF重写最小体积
auto-aof-rewrite-percentage 100 # AOF重写增长百分比
aof-use-rdb-preamble yes # 开启混合持久化(Redis 4.0+)
aof-load-truncated yes # 加载AOF时,若尾部损坏则忽略,继续加载(默认yes)

6. redis事务

Redis事务就是提供一种机制,将多个Redis命令打包成一个执行单元,保证这个单元内的命令会按照顺序、无中断的执行,同时支持对命令执行结果的统一处理,解决多命令批量执行的原子性需求。

Redis事务只依赖五个命令:

  1. MULTI 标记事务开始,后续所有的命令都会加入到事务队列中
  2. EXEC 执行事务队列中的所有命令,执行完成后结束事务,返回所有命令的执行结果
  3. DISCARD 放弃事务队列中的所有命令,清空队列结束事务,回到正常的执行模式
  4. WATCH KEY 对key加乐观锁,监控key是否修改,必须在MULTI之前修改
  5. UNWATCH 取消所有被watch监控的key,事务取消或者执行后会自动执行

看下这个最基础的实务流程:

MULTI
SET balance 100
INCR balance
EXEC

执行到MULTI时,会进入事务状态,后续的SETINCR会被放入到一个事务队列中,直行到EXEC时才会执行队列中的所有的命令。

传统的关系型数据库事务严格遵循ACID原则,原子性、一致性、隔离性和持久性,但是Redis事务为了极致的性能,并不是完全遵循ACID原则。接下来介绍下他的区别:

  1. 原子性

    原子性的定义就是事务中的所有的操作,要么全部执行,要么全部不执行,不会出现部分执行的情况,而Redis事务的原子性分为两种情况:

    1) 事务入队前出错,全不执行:当在MULTI后,EXEC前出现语法错误,Redis会立即返回错误,执行EXEC时会直接放弃整个事务

    2) 执行事务中出现错误,部分执行,没有回滚,命令入队时只会做语法检查,不会做逻辑检查,执行时如果出现了运行错误,Redis就会跳过这个命令,继续执行后续的命令,而且不会对已经执行的命令做回滚

    不支持回滚主要也是从性能考量,实现回滚需要记录每个命令的逆操作,比如SET的操作就是恢复原值,这个会增加Redis内核的复杂度,牺牲执行的性能。

  2. 一致性

    一致性就是事务执行的前后,数据库的状态始终保持合法,不会因为事务的执行而出现脏数据。Redis事务可以在所有的异常情况下,比如入队错误、执行错误、宕机,都可以保证数据的一致性。

  3. 隔离性

    隔离就是在多个事务并发执行时,一个事务的执行不会被其他的事务干扰,各个事务之间相互隔离。Redis是单线程处理客户端请求的,这就会导致事务的执行会按照队列中的顺序连续执行,不会被其他的命令打断

  4. 持久性

    持久性是指事务执行成功后,对数据的修改会被永久的保存到磁盘中,不会应为宕机而丢失。Redis事务本身并不保证持久性,持久性是由Redis的持久化机制来实现的,前面也介绍过

接下来写一个小的demo,利用watch来控制库存防止超卖

def try_purchase(stock_key: str, user: str, qty: int = 1) -> bool:
    """使用 WATCH + 事务进行扣库存,避免超卖。

    乐观锁思路:
    1. WATCH 库存 key,监听是否被别人改动;
    2. 读当前库存,判断是否足够;
    3. 使用 MULTI 开启事务,扣减库存;
    4. EXEC 提交,如果在这期间库存被别人改了,EXEC 会失败(抛 WatchError),然后重试。
    """

    with r.pipeline() as pipe:
        while True:
            try:
                # 1. 监听库存 key
                pipe.watch(stock_key)

                # 2. 读取当前库存
                current = pipe.get(stock_key)
                if current is None:
                    print(f"{user}: 商品不存在")
                    pipe.unwatch()
                    return False

                current = int(current)
                if current < qty:
                    print(f"{user}: 库存不足,当前库存={current}")
                    pipe.unwatch()
                    return False

                # 3. 开启事务,扣减库存
                pipe.multi()
                pipe.decrby(stock_key, qty)

                # 4. 提交事务
                pipe.execute()
                print(f"{user}: 抢购成功,扣减 {qty},扣减前库存={current}")
                return True

            except redis.WatchError:
                # 在 WATCH 之后、EXEC 之前,有其他客户端修改了 stock_key,
                # 这次事务会失败,需要重试。
                print(f"{user}: 检测到并发冲突,重试中...")
                continue

第三阶段:高并发&分布式

7. 缓存模式与一致性

Redis作为缓存的核心亮点就在于其高速的读写操作,来降低传统数据库的压力,基于此Redis推出了有大概四种主流的缓存策略,来将缓存融入业务读写流程,同时保证缓存与数据库的一致性。接下来挨个介绍下:

  1. 缓存穿透模式

    缓存和数据库分离,业务代码主动管理缓存和数据库的交互。在读操作时,先查询缓存,如果命中就直接返回,如果没有就查询数据库,同时将数据库的结果写入缓存;在写操作时,先更新数据库,在删除缓存。

    这种模式适合绝大多数的生产场景,是Redis作为缓存的首选模式。优点就是简单易实现,缺点就是需要额外处理缓存穿透、击穿雪崩等场景。

  2. 读写穿透

    业务代码只和缓存交互,不直接操作数据库,缓存作为中间层,主动管理数据库的读写。在读操作时,先查询缓存,如果命中就直接返回,未命中就查询数据库,将结果写入缓存,然后返回;写操作就更新缓存,然后再去更新数据库。

    这种模式的特点就是业务代码只专注于业务,数据库由缓存层来处理,简化了业务代码逻辑,缺点就是缓存层需要额外的代码开发,而且不支持新增数据,因为新增数据要先执行读操作,才能存入缓存,不太符合常规的业务逻辑。

  3. 写穿模式

    这种模式是读写穿模式的增强版,支持新增、更新数据。在读操作时,和读写穿透模式一样,命中返回,未命中查库更新缓存;写操作时和新增数据时,缓存同步更新数据库,然后返回给业务。

    这种模式的特点在于写操作时,缓存和数据库同步更新,缓存和数据库有着非常强的一致性,常用于支付业务的核心数据缓存。

  4. 写回模式

    这种模式是写穿模式的异步版,差异就在与写操作时是异步的。在读操作时,和读写穿透、写穿模式一样,命中就返回,未命中查库更新缓存后返回;在新增和更新的写操作时,缓存立即返回,然后异步去更新数据。

    这种模式适合并发高、一致性要求较低的场景,比如日志缓存等。

生产模式中比较常用和推荐的,就是缓存穿透模式,然后这种模式在一致性的问题上需要额外处理。比如有这么几个场景:

  1. 在写操作时,正常的流程是,更新数据库,然后删除缓存,更新缓存需要下次读的时候去查库更新。但是如果更新数据库后,遇到宕机或者网络异常,就会导致缓存未及时删除,这就出现了脏数据,知道缓存过期。这也是最常见的不一致场景,属于操作中断导致的缓存未更新。
  2. 在并发的场景下,客户端A和客户端B同时分别执行读写操作,A读操作时,缓存未命中就会去查库,B写操作时更新完数据库后,回去删除缓存,这时如果读操作的写缓存的动作晚于写操作的删除动作,就会产生数据库与缓存的不一致场景,数据库是新的数据,缓存中是旧的脏数据。

在数据一致性上有这样一个原则,最终数据一致性即可,而非强制一致性。Redis作为缓存,是没有办法实现和数据库的强一致性。因为缓存和数据库属于两个独立的存储系统,非要强一致性就需要加锁,这就会牺牲Redis的高性能,而且在实际业务中,带短暂的不一致,对于用户来说并没有感知的。

在缓存穿透模式中,有这么几个方案可以解决缓存与数据库的一致性问题

  1. 单实例低并发场景,在一些后台管理,小流量业务汇中,可以直接使用缓存穿透模式的基础逻辑,即读操作时先缓存后数据库,如果不存在就写一个缓存空值,加上过期时间;在写操作时,先更新数据库,在删除缓存,给所有的缓存加上过期时间。

    这里设置缓存空值,就是为了防止缓存杀手-穿透,比如查询一个数据库没有的值,这就会直接访问数据库,万一遇到恶意访问的脚本,就会导致数据库压力;而设置一个空值,在过期时间内,他是一个有效的缓存,虽然没有值,但减轻了数据库的压力,算是为了系统的稳定性做了一次兜底。

  2. 单实例高并发,在电商的商品详情、商品秒杀库存业务中,在基础方案上增加延迟删除缓存,来解决读操作写缓存晚于写操作删缓存的问题。 流程就是在写操作执行更新数据库后,延迟N毫秒删除缓存,让读操作查库、写缓存的动作先完成。

还有一个常见的八股文:缓存的三大杀手,穿透、击穿和雪崩。

  1. 穿透,在前面的穿透模式时介绍过了,就是查询一些数据库中不存在的值时,每次都会去查询数据库,导致缓存失效,数据库增加额外的压力。解决方案有下面几种:

    1)设置空值,前面也介绍过,这是最简通用的方案

    2)布隆过滤器,提前将数据库中所有的合法key存入过滤器,请求先过过滤器,判定不存在就会直接拒绝,就走不到缓存和数据库了

    3)IP/接口 限流熔断,对于穿透请求高频的IP限流,对查询接口做熔断保护。这里的限流就是限制单位时间内允许请求的数量,比如单位时间内某个IP大量请求不存在的ID,加了限流之后直接回报错429错误码,就走不到缓存、数据库了;熔断就是当下游服务器持续失败或者过慢时,暂时切断请求,防止雪崩扩散。

  2. 击穿,某个极高的热点key,恰好过期或者被删除,此时大量并发请求同时访问该key,全部缓存未命中,所有的请求都会访问到数据库。这里的解决方案有:互斥锁串行重建缓存,热点key永不过期,热点key主动更新

  3. 雪崩,大量缓存key在同一时间集体过期,或者Redis缓存集体宕机,导致请求绕过缓存直接访问数据库,导致数据库负载瞬间爆表,引发整个服务链路雪崩。解决方案有:过期时间随机化,Redis集群高可用,服务限流、熔断、降级。

8. 分布式锁

在分布式、微服务架构中,一个服务会运行在多台机器上,这就是多进程的概念,多台机器会共享一个资源,这个时候python的线程锁就会失效,因为这些锁的作用范围是当前进程的内存,只能管自己进程内的线程。这时就需要分布式锁了,分布式锁就是跨进程、跨服务的锁,要保证多个进程对共享资源的互斥访问。

分布式锁有四个核心的特性:

  1. 互斥性,同一时间只有一个客户端持有锁,其他的客户端必须等待
  2. 安全性,锁只能被其持有者释放,不能被其他客户端误删
  3. 避免死锁,即使持有锁的客户端崩溃、中断,也可以在一定时间后自动释放
  4. 可用性,Redis集群环境下,锁服务不能单点故障,要保证大部分节点可用

Redis做分布式锁的优点,就在于其性能极高,获取、释放锁都是毫秒级,而且部署也比较简单。

接下来介绍下Redis单节点分布式锁的几个命令:

  1. key 锁的唯一标识,比如要给ID=111的资源加锁
  2. value 客户端的唯一标识,保证锁只能被持有者释放
  3. NX 全程Not Exist,只有当key不存在时才会设置成功
  4. EX/PX 设置的过期时间,EX单位是秒,PX单位是毫秒
  5. timeout 锁的过期时间,避免死锁

比如这行代码:

SET lock:order:123 uuid:192.168.1.100 NX EX 30

就表明给资源key为lock:order:123的资源加锁,锁的持有者是uuid:192.168.1.100,30秒后过期

而释放锁,就回到刚刚的安全性了,必须只有锁的持有客户端才可以释放。流程就是先判断加锁的客户端是不是自己,如果是才可以去释放,看下相关的Iua脚本:

if redis.call('GET', KEYS[1]) == ARGV[1] then
    return redis.call('DEL', KEYS[1])  -- 标识匹配,删除锁
else
    return 0  -- 标识不匹配,不做任何操作
end

之前写过一个卖票的函数,就是使用Redis的分布式锁来控制库存,防止超卖:

async def sell_one_with_lock(r: Redis, window_name: str) -> bool:
    """使用 Redis 分布式锁保护“检查+扣减”关键区,成功卖出返回 True,售罄或失败返回 False。"""
    lock = r.lock(LOCK_KEY, timeout=5, blocking_timeout=1)  # 超时时间与获取等待时间可调
    acquired = await lock.acquire(blocking=True)
    if not acquired:
        # 未拿到锁,视为本次卖票失败(可重试)
        return False
    try:
        # 关键区:读取剩余、判定、扣减
        remaining_str = await r.get(TICKET_KEY)
        remaining = int(remaining_str) if remaining_str is not None else 0
        if remaining <= 0:
            return False
        # 扣减一张(原子自减命令)
        await r.decr(TICKET_KEY)
        return True
    finally:
        try:
            await lock.release()
        except Exception:
            # 若锁已过期或其他异常,忽略释放错误
            pass

代码第一行就创建了一个分布式锁,传入了一个过期时间防止死锁,lock.acquire是真正的加锁步骤。之前看到这里有个疑惑:每次调用这个方法,都会创建一个分布式锁,如何保证对一个资源加锁,在创建锁的时候传入了LOCK_KEY,这个就是要加锁的key,也就是要加锁的资源,这个方法每次执行都会创建一个锁,但是lock.acquire在资源没有释放的时候,返回的是false,也就是会走到if not这里的。

其实后续章节还有单点故障,主从、哨兵、 Redlock、 扩容、数据分片等,觉得分布式、缓存还需要消化下,后面就不在深入了,打算进入实战环节了,后续打算设计三个实战项目来进一步深入的学习下。

三个实战项目分别是

  1. 信息查询系统,使用MySQL存储用户信息,Redis作为缓存,巩固下前面学习的缓存模式,同时加上压测环节,通过QPS,响应时间更加直观的了解缓存的意义
  2. 抢票系统,学习下Lua脚本,
  3. 秒杀系统,学习下高并发处理、限流、防超卖策略
❌
❌