
Redis 8.0 已经发布,带来了诸多性能改进和新特性。在实际生产环境中,Redis 远不止是一个简单的键值缓存——它是消息队列、分布式协调器、实时数据处理引擎。本文将深入探讨 Redis 在四大核心场景下的高级用法:Stream 消息队列、Redlock 分布式锁、缓存三大问题的解决方案,以及 Redis Module 生态。
Redis 5.0 引入的 Stream 数据类型,在 8.0 中得到了进一步增强。相比传统的 Pub/Sub 模式,Stream 提供了消息持久化、消费者组(Consumer Group)和消息确认(ACK)机制,使其成为构建轻量级消息队列的理想选择。
Stream 的设计借鉴了 Kafka 的消费者组模型:
import redis
import json
import time
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# ========== 生产者 ==========
class OrderProducer:
def __init__(self, stream_key='order:stream'):
self.stream_key = stream_key
def send_order(self, order_id: str, user_id: str, amount: float):
"""发送订单消息到 Stream"""
message = {
'order_id': order_id,
'user_id': user_id,
'amount': str(amount),
'timestamp': str(int(time.time() * 1000))
}
# MAXLEN 10000 限制 Stream 最大长度,防止内存溢出
result = r.xadd(
self.stream_key,
message,
maxlen=10000,
approximate=True # 近似裁剪,性能更好
)
print(f"[Producer] 订单 {order_id} 已发送, msg_id={result}")
return result
# ========== 消费者组 ==========
class OrderConsumer:
def __init__(self, stream_key='order:stream', group='order-processors', consumer_name=None):
self.stream_key = stream_key
self.group = group
self.consumer = consumer_name or f'worker-{threading.current_thread().ident}'
# 创建消费者组(如果不存在),$ 表示从最新消息开始
try:
r.xgroup_create(stream_key, group, id='0', mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def process_orders(self, batch_size=10, block_ms=5000):
"""消费订单消息"""
while True:
# 读取未处理的消息,> 表示只读取新消息
messages = r.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream_key: '>'},
count=batch_size,
block=block_ms
)
if not messages:
continue
for stream, msg_list in messages:
for msg_id, fields in msg_list:
try:
self._handle_message(msg_id, fields)
# 确认消息处理完成
r.xack(stream, self.group, msg_id)
except Exception as e:
print(f"[Consumer] 处理消息 {msg_id} 失败: {e}")
# 处理失败的消息会留在 PEL 中,等待重试
def _handle_message(self, msg_id, fields):
"""业务处理逻辑"""
order_id = fields['order_id']
amount = float(fields['amount'])
print(f"[Consumer] 处理订单: {order_id}, 金额: {amount}")
def claim_stale_messages(self, min_idle_ms=60000):
"""认领超时未处理的消息(实现死信队列逻辑)"""
pending = r.xpending_range(
self.stream_key, self.group,
min='-', max='+', count=100
)
for entry in pending:
if entry['idle'] > min_idle_ms:
# 将超时消息转移给当前消费者
r.xclaim(
self.stream_key, self.group, self.consumer,
min_idle_ms, entry['message_id']
)
print(f"[Consumer] 认领超时消息: {entry['message_id']}")
# 使用示例
if __name__ == '__main__':
producer = OrderProducer()
producer.send_order('ORD-001', 'USER-100', 299.99)
producer.send_order('ORD-002', 'USER-200', 159.50)
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/redis/go-redis/v9"
)
type OrderConsumer struct {
client *redis.Client
stream string
group string
consumer string
}
func NewOrderConsumer(addr string) *OrderConsumer {
client := redis.NewClient(&redis.Options{Addr: addr})
return &OrderConsumer{
client: client,
stream: "order:stream",
group: "order-processors",
consumer: fmt.Sprintf("go-worker-%d", time.Now().UnixNano()),
}
}
func (c *OrderConsumer) Start(ctx context.Context) error {
// 创建消费者组
err := c.client.XGroupCreateMkStream(ctx, c.stream, c.group, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
streams, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.group,
Consumer: c.consumer,
Streams: []string{c.stream, ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil {
continue
}
return err
}
for _, stream := range streams {
for _, msg := range stream.Messages {
orderID := msg.Values["order_id"].(string)
amount := msg.Values["amount"].(string)
log.Printf("处理订单: %s, 金额: %s", orderID, amount)
// 确认消息
c.client.XAck(ctx, c.stream, c.group, msg.ID)
}
}
}
}
}
Redis 6.2+ 引入了 XAUTOCLAIM 命令,可以一次性扫描并认领超时消息,比 XPENDING_RANGE + XCLAIM 的两步操作更高效:
def auto_claim_stale_messages(self, min_idle_ms=60000):
"""使用 XAUTOCLAIM 批量认领超时消息"""
result = r.xautoclaim(
self.stream_key, self.group, self.consumer,
min_idle_ms, '0-0', count=100
)
# result 包含已认领的消息列表和下一个起始 ID
claimed = result[1] # 已认领的消息
next_id = result[2] # 下次查询的起始 ID
for msg_id, fields in claimed:
print(f"[Recovery] 重新处理消息: {msg_id}")
在分布式系统中,多个节点同时访问共享资源时需要协调机制。Redis 基于单线程执行和原子性操作,天然适合实现分布式锁。
单节点 Redis 锁存在单点故障风险:当持有锁的 master 节点宕机且锁尚未同步到 slave 时,slave 提升为 master 后其他客户端可以获取同一把锁。Redlock 算法通过在多个独立 Redis 实例上同时获取锁来解决这个问题。
package redlock
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"time"
"github.com/redis/go-redis/v9"
)
var (
ErrLockNotAcquired = errors.New("lock not acquired")
ErrLockNotHeld = errors.New("lock not held by this owner")
)
type DistributedLock struct {
clients []*redis.Client
quorum int
key string
value string
expiration time.Duration
retryDelay time.Duration
maxRetries int
}
func NewLock(clients []*redis.Client, key string, ttl time.Duration) *DistributedLock {
quorum := len(clients)/2 + 1
token := generateToken()
return &DistributedLock{
clients: clients,
quorum: quorum,
key: key,
value: token,
expiration: ttl,
retryDelay: 50 * time.Millisecond,
maxRetries: 3,
}
}
func generateToken() string {
n, _ := rand.Int(rand.Reader, big.NewInt(1<<62))
return n.String()
}
// Lock 尝试获取分布式锁
func (dl *DistributedLock) Lock(ctx context.Context) error {
for attempt := 0; attempt < dl.maxRetries; attempt++ {
acquired := 0
start := time.Now()
for _, client := range dl.clients {
// 使用 SET NX EX 原子命令
ok, err := client.SetNX(ctx, dl.key, dl.value, dl.expiration).Result()
if err != nil {
continue // 网络错误,跳过该节点
}
if ok {
acquired++
}
}
elapsed := time.Since(start)
// 有效时间 = 原始TTL - 获取锁耗时(留出安全余量)
validity := dl.expiration - elapsed - 10*time.Millisecond
if acquired >= dl.quorum && validity > 0 {
return nil
}
// 未达到法定人数,释放已获取的锁
dl.Unlock(ctx)
if attempt < dl.maxRetries-1 {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(dl.retryDelay):
}
}
}
return ErrLockNotAcquired
}
// Unlock 释放分布式锁(使用 Lua 脚本保证原子性)
var unlockScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
func (dl *DistributedLock) Unlock(ctx context.Context) {
for _, client := range dl.clients {
unlockScript.Run(ctx, client, []string{dl.key}, dl.value)
}
}
// 使用示例
func ExampleUsage() {
clients := []*redis.Client{
redis.NewClient(&redis.Options{Addr: "redis-node1:6379"}),
redis.NewClient(&redis.Options{Addr: "redis-node2:6379"}),
redis.NewClient(&redis.Options{Addr: "redis-node3:6379"}),
}
lock := NewLock(clients, "inventory:lock:SKU-001", 10*time.Second)
ctx := context.Background()
if err := lock.Lock(ctx); err != nil {
fmt.Println("获取锁失败:", err)
return
}
defer lock.Unlock(ctx)
fmt.Println("成功获取锁,执行业务逻辑...")
}
Martin Kleppmann 曾对 Redlock 提出质疑,主要围绕时钟漂移(GC pause)和锁安全性问题。在实际应用中,建议:
问题:恶意请求查询数据库中不存在的 key,缓存永远无法命中,所有请求打到数据库。
解决方案:
import redis
import hashlib
import json
from functools import wraps
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def cache_with_protection(prefix, ttl=3600, null_cache_ttl=300):
"""
带缓存穿透保护的装饰器
- 缓存空值(短TTL)防止穿透
- 使用布隆过滤器前置拦截(可选)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存 key
key_parts = [prefix] + [str(a) for a in args]
key_parts += [f"{k}={v}" for k, v in sorted(kwargs.items())]
cache_key = "cache:" + hashlib.md5(":".join(key_parts).encode()).hexdigest()
# 1. 查询缓存
cached = r.get(cache_key)
if cached is not None:
if cached == "__NULL__":
return None # 空值标记
return json.loads(cached)
# 2. 缓存未命中,查询数据库
result = func(*args, **kwargs)
# 3. 写入缓存(包括空值)
if result is None:
# 缓存空值,短TTL防止穿透
r.setex(cache_key, null_cache_ttl, "__NULL__")
else:
r.setex(cache_key, ttl, json.dumps(result, ensure_ascii=False))
return result
return wrapper
return decorator
# 使用示例
@cache_with_protection(prefix="user:info", ttl=1800, null_cache_ttl=120)
def get_user_info(user_id: int):
"""从数据库查询用户信息"""
# db.query(...)
return {"id": user_id, "name": "张三"}
问题:某个热点 key 在高并发下过期,大量请求同时穿透到数据库。
解决方案 - 互斥锁 + 逻辑过期:
import threading
class HotKeyCache:
def __init__(self):
self._locks = threading.Lock()
self._key_locks = {} # 细粒度 key 级别锁
def get_with_mutex(self, cache_key: str, db_query_fn, ttl: int = 3600):
"""
互斥锁方案:只允许一个线程重建缓存
"""
# 查缓存
data = r.get(cache_key)
if data is not None:
return json.loads(data)
# 获取 key 级别的锁
lock_key = f"lock:{cache_key}"
# 使用 SET NX 实现分布式互斥锁
acquired = r.set(lock_key, "1", nx=True, ex=10)
if acquired:
try:
# 双重检查:获取锁后再查一次缓存
data = r.get(cache_key)
if data is not None:
return json.loads(data)
# 查数据库并重建缓存
result = db_query_fn()
if result is not None:
r.setex(cache_key, ttl, json.dumps(result, ensure_ascii=False))
return result
finally:
r.delete(lock_key)
else:
# 未获取到锁,短暂等待后重试
time.sleep(0.1)
data = r.get(cache_key)
if data is not None:
return json.loads(data)
return db_query_fn() # 降级:直接查库
问题:大量缓存 key 在同一时间过期,或者 Redis 节点宕机,导致请求全部打到数据库。
多层防御策略:
import random
class AntiAvalancheCache:
"""
缓存雪崩防御策略:
1. TTL 加随机偏移,避免同时过期
2. 多级缓存(本地 + Redis)
3. 熔断降级
"""
def __init__(self):
self.local_cache = {} # 进程内缓存(生产环境用 LRU Cache)
def get(self, cache_key: str, db_query_fn, base_ttl: int = 3600):
# 第一层:本地缓存
if cache_key in self.local_cache:
return self.local_cache[cache_key]
# 第二层:Redis 缓存
data = r.get(cache_key)
if data is not None:
result = json.loads(data)
self.local_cache[cache_key] = result
return result
# 查询数据库
try:
result = db_query_fn()
if result is not None:
# TTL 加随机偏移(±20%),打散过期时间
jitter = random.randint(int(base_ttl * 0.8), int(base_ttl * 1.2))
r.setex(cache_key, jitter, json.dumps(result, ensure_ascii=False))
self.local_cache[cache_key] = result
return result
except Exception as e:
# 熔断降级:返回兜底数据
print(f"[Circuit Breaker] 数据库异常: {e}")
return self._get_fallback(cache_key)
def _get_fallback(self, cache_key: str):
"""返回兜底数据"""
fallback = r.get(f"fallback:{cache_key}")
if fallback:
return json.loads(fallback)
return None
Redis 的模块化架构允许开发者通过 C/Rust 编写扩展模块,为 Redis 添加新的数据结构和命令。
| 模块 | 用途 | 特点 |
|---|---|---|
| RediSearch | 全文搜索引擎 | 支持中文分词、聚合查询、二级索引 |
| RedisJSON | JSON 数据处理 | 原生 JSONPath 支持,比序列化存储高效 |
| RedisTimeSeries | 时序数据存储 | 降采样、聚合、数据保留策略 |
| RedisBloom | 概率型数据结构 | 布隆过滤器、Cuckoo Filter、Count-Min Sketch |
| RedisGraph | 图数据库 | Cypher 查询语言,知识图谱场景 |
import redis.commands.search.field as search_field
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
# 写入 JSON 文档
doc = {
"title": "Redis 8.0 高级用法实战",
"author": "WD Tech",
"tags": ["redis", "缓存", "分布式"],
"content": "本文深入探讨 Redis Stream、Redlock...",
"views": 1500
}
r.json().set("article:1001", "$", doc)
# 创建搜索索引
r.ft("article_idx").create_index(
fields=[
search_field.TextField("$.title", as_name="title"),
search_field.TextField("$.content", as_name="content"),
search_field.TagField("$.tags", as_name="tags"),
search_field.NumericField("$.views", as_name="views"),
],
definition=IndexDefinition(
prefix=["article:"],
index_type=IndexType.JSON
)
)
# 全文搜索
result = r.ft("article_idx").search("Redis 缓存")
for doc in result.docs:
print(f"标题: {doc.title}, 浏览量: {doc.views}")
from redisbloom.client import Client as BloomClient
bf = BloomClient(host='localhost', port=6379)
# 初始化布隆过滤器(预期100万条数据,误判率0.001)
bf.bfCreate("user:exists", 1000000, 0.001)
# 用户注册时添加到布隆过滤器
bf.bfAdd("user:exists", "USER-1001")
# 查询前先检查布隆过滤器
def get_user(user_id: str):
exists = bf.bfExists("user:exists", user_id)
if not exists:
return None # 100% 确定用户不存在,直接返回
# 可能存在,继续查缓存和数据库
return get_user_info(user_id)
Redis 8.0 的强大之处在于它不仅仅是一个缓存工具,而是一个多面手:
在实际项目中,建议根据业务规模选择合适的方案,避免过度设计。Redis 的简洁和高效正是其最大的优势。