超级爱敏 發表於 2025-9-30 09:18:20

Redis 实现消息队列实际案例

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">一、为什么选择 Redis 做消息队列?</a></li><ul class="second_class_ul"><li><a href="#_lab2_0_0">1.1 Redis 消息队列的核心优势</a></li><li><a href="#_lab2_0_1">1.2 适用场景与不适用场景</a></li><ul class="third_class_ul"><li><a href="#_label3_0_1_0">适用场景</a></li><li><a href="#_label3_0_1_1">不适用场景</a></li></ul></ul><li><a href="#_label1">二、Redis 实现消息队列的 3 种核心方案</a></li><ul class="second_class_ul"><li><a href="#_lab2_1_2">方案一、基于 Redis List 的简单消息队列实现</a></li><ul class="third_class_ul"><li><a href="#_label3_1_2_2">1. 方案概述</a></li><li><a href="#_label3_1_2_3">2. 代码实战(Java + Jedis)</a></li><li><a href="#_label3_1_2_4">3. 方案优化与问题解决</a></li><li><a href="#_label3_1_2_5">4. 适用场景分析</a></li><li><a href="#_label3_1_2_6">5. 生产环境建议</a></li></ul><li><a href="#_lab2_1_3">方案二、基于 Pub/Sub 的广播式消息队列方案详解</a></li><ul class="third_class_ul"><li><a href="#_label3_1_3_7">Redis Pub/Sub 模型介绍</a></li><li><a href="#_label3_1_3_8">核心命令及功能详解</a></li><li><a href="#_label3_1_3_9">2.1 代码实战(Java + Jedis)</a></li><li><a href="#_label3_1_3_10">2.2 方案深度分析与应用场景</a></li><li><a href="#_label3_1_3_11">使用建议</a></li></ul><li><a href="#_lab2_1_4">方案 3:基于 Stream 的可靠消息队列(Redis 5.0+)</a></li><ul class="third_class_ul"><li><a href="#_label3_1_4_12">3.1 Stream 核心概念</a></li><li><a href="#_label3_1_4_13">3.2 核心命令详解</a></li><li><a href="#_label3_1_4_14">3.3 代码实战(Java + Jedis)</a></li><li><a href="#_label3_1_4_15">3.4 最佳实践建议</a></li></ul></ul><li><a href="#_label2">三、三种方案的选型对比与最佳实践</a></li><ul class="second_class_ul"><li><a href="#_lab2_2_5">3.1 方案选型对比表:</a></li><ul class="third_class_ul"></ul><li><a href="#_lab2_2_6">3.2 最佳实践建议</a></li><ul class="third_class_ul"></ul></ul><li><a href="#_label3">四、实际应用案例:电商订单异步处理</a></li><ul class="second_class_ul"><li><a href="#_lab2_3_7">4.1 业务流程详解</a></li><ul class="third_class_ul"></ul><li><a href="#_lab2_3_8">4.2 核心代码实现(生产级优化版)</a></li><ul class="third_class_ul"><li><a href="#_label3_3_8_16">订单服务(生产者)增强实现</a></li><li><a href="#_label3_3_8_17">通知服务(消费者)完整实现</a></li><li><a href="#_label3_3_8_18">库存服务(消费者)完整实现</a></li></ul><li><a href="#_lab2_3_9">4.3 监控与运维设计</a></li><ul class="third_class_ul"></ul></ul></ul></div><p class="maodian"><a name="_label0"></a></p><h2>一、为什么选择 Redis 做消息队列?</h2>
<p class="maodian"><a name="_lab2_0_0"></a></p><h3>1.1 Redis 消息队列的核心优势</h3>
<p><strong>轻量级部署</strong>:无需单独部署 RabbitMQ、Kafka 等消息队列服务,可以直接复用现有 Redis 集群。例如一个电商系统可能已经使用 Redis 做缓存,现在只需增加消息队列功能,无需额外维护其他中间件,显著降低运维成本;</p>
<p><strong>高性能</strong>:基于内存操作,单节点 QPS 可达 10 万级,满足高吞吐场景。实测表明,在标准服务器配置下,Redis 处理简单消息的延迟可低至 0.1ms,远优于传统磁盘存储的消息队列;</p>
<p><strong>API 简洁</strong>:依托 Redis 原生命令即可实现完整队列功能:</p>
<ul><li>LPUSH/RPUSH 用于生产者推送消息</li><li>BLPOP/BRPOP 实现消费者阻塞式拉取</li><li>PUBLISH/SUBSCRIBE 支持发布订阅模式</li><li>XADD/XREAD 提供 Stream 类型支持 开发人员无需学习复杂的新 API,显著降低开发成本;</li></ul>
<p><strong>支持多语言</strong>:所有主流语言的 Redis 客户端(Java/Jedis、Python/redis-py、Go/redigo 等)均原生支持消息队列相关命令。例如 Java 开发者可以直接使用 Jedis 的 lpush() 方法发送消息,无需额外依赖;</p>
<p><strong>可扩展性</strong>:通过 Redis Cluster 可以轻松实现消息队列的横向扩展。例如可以将不同业务的消息分配到不同分片,同时利用 Redis Sentinel 实现高可用,确保消息服务不间断。</p>
<p class="maodian"><a name="_lab2_0_1"></a></p><h3>1.2 适用场景与不适用场景</h3>
<p class="maodian"><a name="_label3_0_1_0"></a></p><h4>适用场景</h4>
<ul><li><strong>轻量级异步通信</strong>:如电商系统中的订单状态变更通知、APP 的日志上报等。例如用户下单后,系统可以通过 Redis 队列异步通知库存系统扣减库存,而不影响主流程响应速度;</li><li><strong>高吞吐但允许少量重复的场景</strong>:如用户行为数据同步、监控数据采集等。例如一个短视频平台需要将用户的观看记录同步到推荐系统,即使偶尔出现重复消息也不影响业务逻辑;</li><li><strong>中小型系统的解耦需求</strong>:当系统规模尚未达到需要引入 Kafka 等重量级组件时。例如一个初创公司的支付系统与通知系统之间使用 Redis 队列解耦,避免系统间直接依赖。</li></ul>
<p class="maodian"><a name="_label3_0_1_1"></a></p><h4>不适用场景</h4>
<ul><li><strong>金融级事务消息</strong>:如银行转账、证券交易等需要强一致性和零丢失的场景。Redis 的持久化机制(RDB/AOF)无法保证 100% 不丢失消息,且缺乏事务消息的回查机制;</li><li><strong>复杂路由需求</strong>:如需要死信队列、优先级队列、延迟队列等高级特性时。虽然 Redis 可以通过 Sorted Set 实现简单延迟队列,但相比 RabbitMQ 的专业实现功能有限;</li><li><strong>海量消息存储</strong>:如需要保存数月历史消息的聊天系统。Redis 作为内存数据库,存储容量受服务器内存限制,且长期存储成本过高。例如一个日均百万消息的客服系统,使用 Redis 存储一周消息就可能需要上百 GB 内存。</li></ul>
<p class="maodian"><a name="_label1"></a></p><h2>二、Redis 实现消息队列的 3 种核心方案</h2>
<p class="maodian"><a name="_lab2_1_2"></a></p><h3>方案一、基于 Redis List 的简单消息队列实现</h3>
<p class="maodian"><a name="_label3_1_2_2"></a></p><h4>1. 方案概述</h4>
<p>Redis 的 List 数据结构是一个双向链表,具有以下特性使其非常适合实现消息队列:</p>
<ul><li>支持从两端(O(1)时间复杂度)插入和删除元素</li><li>天然支持&quot;生产者-消费者&quot;模型</li><li>提供阻塞式获取消息的命令</li><li>内存存储,性能极高(每秒可处理数万次操作)</li></ul>
<h5>1.1 核心命令详解</h5>
<table><thead><tr><th>角色</th><th>核心命令</th><th>作用说明</th><th>时间复杂度</th></tr></thead><tbody><tr><td>生产者</td><td>LPUSH key value1 value2</td><td>从 List 左侧插入消息(头部插入),支持批量插入,返回插入后 List 的长度</td><td>O(1)</td></tr><tr><td>生产者</td><td>RPUSH key value1 value2</td><td>从 List 右侧插入消息(尾部插入),支持批量插入</td><td>O(1)</td></tr><tr><td>消费者</td><td>BLPOP key timeout</td><td>从 List 左侧阻塞获取消息(头部取出),若 List 为空则等待timeout秒</td><td>O(1)</td></tr><tr><td>消费者</td><td>BRPOP key timeout</td><td>从 List 右侧阻塞获取消息(尾部取出),若 List 为空则等待timeout秒</td><td>O(1)</td></tr><tr><td>监控</td><td>LLEN key</td><td>获取当前队列的消息数量</td><td>O(1)</td></tr><tr><td>监控</td><td>LRANGE key start end</td><td>查看队列中从start到end的消息(如LRANGE queue 0 9查看前10条)</td><td>O(S+N)</td></tr></tbody></table>
<p class="maodian"><a name="_label3_1_2_3"></a></p><h4>2. 代码实战(Java + Jedis)</h4>
<h5>2.1 环境准备</h5>
<p>首先引入 Jedis 依赖(Maven):</p>
<div class="jb51code"><pre class="brush:plain;">&lt;dependency&gt;
    &lt;groupId&gt;redis.clients&lt;/groupId&gt;
    &lt;artifactId&gt;jedis&lt;/artifactId&gt;
    &lt;version&gt;4.4.3&lt;/version&gt; &lt;!-- 建议使用最新稳定版 --&gt;
&lt;/dependency&gt;</pre></div>
<h5>2.2 生产者实现</h5>
<div class="jb51code"><pre class="brush:java;">import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class ListMQProducer {
    // 队列key命名规范:业务域:组件类型:数据结构:具体业务
    private static final String QUEUE_KEY = "redis:mq:list:order";
    // 使用连接池提高性能
    private static final JedisPool jedisPool = new JedisPool(
      new JedisPoolConfig(),
      "localhost",
      6379,
      2000,// 连接超时时间
      null   // 密码
    );
    public static void main(String[] args) throws InterruptedException {
      try (Jedis jedis = jedisPool.getResource()) {
            // 模拟发送10条订单消息
            for (int i = 1; i &lt;= 10; i++) {
                // 消息内容格式:业务标识_序号_时间戳
                String message = String.format("order_%d_%d", i, System.currentTimeMillis());
                // LPUSH命令将消息放入队列头部
                long queueLength = jedis.lpush(QUEUE_KEY, message);
                System.out.printf("生产者发送消息:%s,当前队列长度:%d%n", message, queueLength);
                // 模拟业务处理间隔
                Thread.sleep(500);
            }
      } catch (Exception e) {
            System.err.println("生产者异常:" + e.getMessage());
      } finally {
            jedisPool.close();
      }
    }
}</pre></div>
<h5>2.3 消费者实现</h5>
<div class="jb51code"><pre class="brush:java;">import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
public class ListMQConsumer {
    private static final String QUEUE_KEY = "redis:mq:list:order";
    private static final JedisPool jedisPool = new JedisPool(
      new JedisPoolConfig(),
      "localhost",
      6379
    );
    public static void main(String[] args) {
      System.out.println("消费者启动,等待接收消息...");
      while (true) {
            try (Jedis jedis = jedisPool.getResource()) {
                // BRPOP命令参数:
                // 1. 超时时间3秒(避免空轮询消耗CPU)
                // 2. 可以监听多个队列
                List&lt;String&gt; messages = jedis.brpop(3, QUEUE_KEY);
                if (messages != null) {
                  // BRPOP返回结果格式:
                  // 第一个元素是队列key
                  // 第二个元素是消息内容
                  String message = messages.get(1);
                  System.out.println("消费者接收消息:" + message);
                  // 业务处理逻辑示例
                  processMessage(message);
                } else {
                  System.out.println("队列暂无消息,继续等待...");
                }
            } catch (Exception e) {
                System.err.println("消费者处理消息异常:" + e.getMessage());
                // 异常处理策略:
                // 1. 记录错误日志
                // 2. 重试机制
                // 3. 告警通知
                try {
                  Thread.sleep(5000); // 出错后暂停5秒
                } catch (InterruptedException ie) {
                  Thread.currentThread().interrupt();
                }
            }
      }
    }
    private static void processMessage(String message) throws InterruptedException {
      // 模拟业务处理
      System.out.println("处理消息:" + message);
      // 解析消息内容
      String[] parts = message.split("_");
      String orderId = parts;
      // 模拟业务处理耗时
      Thread.sleep(1000);
      System.out.println("订单" + orderId + "处理完成");
    }
}</pre></div>
<p class="maodian"><a name="_label3_1_2_4"></a></p><h4>3. 方案优化与问题解决</h4>
<h5>3.1 标准方案的局限性</h5>
<ul><li><strong>消息丢失风险</strong>:
<ul><li>消费者获取消息后,如果处理过程中崩溃,消息将永久丢失</li><li>无消息确认机制</li></ul></li><li><strong>功能限制</strong>:<ul><li>不支持广播模式(多个消费者同时消费同一条消息)</li><li>无优先级队列</li><li>无延迟队列功能</li></ul></li><li><strong>监控缺失</strong>:<ul><li>缺乏消息处理状态跟踪</li><li>无死信队列处理机制</li></ul></li></ul>
<h5>3.2 消息可靠性优化方案</h5>
<p>3.2.1 消息确认机制实现</p>
<div class="jb51code"><pre class="brush:java;">private static final String CONFIRM_QUEUE_KEY = "redis:mq:list:order:confirm";
private static final String DEAD_QUEUE_KEY = "redis:mq:list:order:dead";
private static final int MAX_RETRY = 3;
// 优化后的消费者处理逻辑
List&lt;String&gt; messages = jedis.brpop(3, QUEUE_KEY);
if (messages != null) {
    String message = messages.get(1);
    // 1. 将消息移到待确认队列(使用RPUSH保持顺序)
    jedis.rpush(CONFIRM_QUEUE_KEY, message);
    try {
      // 2. 处理业务逻辑
      processMessage(message);
      // 3. 处理成功,从待确认队列删除
      jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
    } catch (Exception e) {
      System.err.println("处理消息失败:" + message);
      // 4. 检查重试次数
      long retryCount = jedis.incr("retry:" + message);
      if (retryCount &lt;= MAX_RETRY) {
            // 放回主队列重试
            jedis.lpush(QUEUE_KEY, message);
      } else {
            // 超过重试次数,放入死信队列
            jedis.rpush(DEAD_QUEUE_KEY, message);
      }
      // 无论重试还是加入死信队列,都要从待确认队列删除
      jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
    }
}</pre></div>
<p>3.2.2 定时补偿任务</p>
<div class="jb51code"><pre class="brush:java;">// 定时检查待确认队列(每分钟执行)
public void checkConfirmQueue() {
    try (Jedis jedis = jedisPool.getResource()) {
      // 获取待确认队列所有消息
      List&lt;String&gt; pendingMessages = jedis.lrange(CONFIRM_QUEUE_KEY, 0, -1);
      for (String message : pendingMessages) {
            // 检查消息滞留时间
            long createTime = Long.parseLong(message.split("_"));
            long currentTime = System.currentTimeMillis();
            long delay = currentTime - createTime;
            // 超过30秒未处理则重试
            if (delay &gt; 30000) {
                jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
                jedis.lpush(QUEUE_KEY, message);
                System.out.println("消息超时重试:" + message);
            }
      }
    }
}</pre></div>
<h5>3.3 性能优化策略</h5>
<ul><li><strong>横向扩展</strong>:
<ul><li>增加消费者实例数量,利用 List 的 BRPOP 命令天然支持多消费者竞争</li><li>可采用消费者组模式,每个组独立消费</li></ul></li><li><strong>批量处理</strong>:</li></ul>
<div class="jb51code"><pre class="brush:java;">// 生产者批量发送
jedis.lpush(QUEUE_KEY, "msg1", "msg2", "msg3");
// 消费者批量获取(非阻塞)
List&lt;String&gt; batch = jedis.rpop(QUEUE_KEY, 10); // 获取最多10条</pre></div>
<p><strong>管道(Pipeline)优化</strong>:</p>
<div class="jb51code"><pre class="brush:java;">try (Pipeline p = jedis.pipelined()) {
    p.lpush(QUEUE_KEY, "msg1");
    p.lpush(QUEUE_KEY, "msg2");
    p.sync(); // 批量提交
}</pre></div>
<p><strong>监控指标</strong>:</p>
<p>队列长度监控:<code>LLEN key</code></p>
<p>消费者积压:比较生产和消费速率</p>
<p>异常告警:死信队列增长监控</p>
<p class="maodian"><a name="_label3_1_2_5"></a></p><h4>4. 适用场景分析</h4>
<h5>4.1 推荐使用场景</h5>
<ul><li><strong>异步任务处理</strong>:
<ul><li>订单创建后的后续处理(如发送通知、更新库存)</li><li>日志收集和分析</li></ul></li><li><strong>削峰填谷</strong>:<ul><li>秒杀系统请求缓冲</li><li>突发流量处理</li></ul></li><li><strong>系统解耦</strong>:<ul><li>微服务间通信</li><li>事件驱动架构</li></ul></li></ul>
<h5>4.2 不适用场景</h5>
<ol><li><strong>严格顺序要求</strong>:List虽然有序,但在多消费者场景下不能保证全局顺序</li><li><strong>广播模式需求</strong>:需要所有消费者收到相同消息</li><li><strong>持久化要求高</strong>:Redis是内存数据库,虽然支持持久化但不保证100%可靠</li><li><strong>复杂路由需求</strong>:需要根据消息内容路由到不同队列</li></ol>
<p class="maodian"><a name="_label3_1_2_6"></a></p><h4>5. 生产环境建议</h4>
<ul><li><strong>Redis配置</strong>:
<ul><li>启用AOF持久化:<code>appendonly yes</code></li><li>合理设置内存淘汰策略:<code>maxmemory-policy volatile-lru</code></li><li>设置合理超时:<code>timeout 300</code>(秒)</li></ul></li><li><strong>高可用</strong>:<ul><li>使用Redis Sentinel或Cluster</li><li>客户端实现故障转移</li></ul></li><li><strong>监控指标</strong>:</li></ul>
<div class="jb51code"><pre class="brush:sql;"># 监控队列长度
redis-cli llen redis:mq:list:order

# 监控Redis内存
redis-cli info memory
</pre></div>
<ul><li><strong>命名规范</strong>:
<ul><li>业务域:组件类型:数据结构:具体业务</li></ul></li></ul>
<p>示例:<code>payment:mq:list:refund</code></p>
<p class="maodian"><a name="_lab2_1_3"></a></p><h3>方案二、基于 Pub/Sub 的广播式消息队列方案详解</h3>
<p class="maodian"><a name="_label3_1_3_7"></a></p><h4>Redis Pub/Sub 模型介绍</h4>
<p>Redis 的 Pub/Sub(发布 - 订阅)模型是一种高效的&quot;一对多&quot;消息通信机制,它允许生产者将消息发布到特定的频道(Channel),而所有订阅该频道的消费者都能即时接收到这些消息。这种模式特别适合需要实时广播的场景,如新闻推送、实时聊天系统等。</p>
<p class="maodian"><a name="_label3_1_3_8"></a></p><h4>核心命令及功能详解</h4>
<table><thead><tr><th>角色</th><th>核心命令</th><th>作用说明</th></tr></thead><tbody><tr><td>生产者</td><td><code>PUBLISH channel message</code></td><td>向指定频道发布消息,返回接收消息的消费者数量</td></tr><tr><td>消费者</td><td><code>SUBSCRIBE channel1 channel2</code></td><td>订阅一个或多个频道,阻塞等待消息(订阅状态下只能接收消息,无法执行其他命令)</td></tr><tr><td>消费者</td><td><code>PSUBSCRIBE pattern</code></td><td>使用模式匹配订阅频道(如<code>PSUBSCRIBE redis:mq:pubsub:*</code>订阅所有匹配前缀的频道)</td></tr></tbody></table>
<p class="maodian"><a name="_label3_1_3_9"></a></p><h4>2.1 代码实战(Java + Jedis)</h4>
<h5>生产者实现(发布消息)</h5>
<div class="jb51code"><pre class="brush:sql;">import redis.clients.jedis.Jedis;

public class PubSubProducer {
    // 定义频道名称,采用命名空间方式避免冲突
    private static final String CHANNEL_KEY = "redis:mq:pubsub:news";
    // 创建Redis连接实例
    private static final Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) throws InterruptedException {
      // 模拟发布3条新闻消息,实际应用中可接入实时数据源
      String[] news = {
            "Redis 7.2版本发布,新增Stream增强功能",
            "基于Redis的消息队列在电商场景的实践",
            "Redis Cluster集群部署最佳实践"
      };

      // 循环发布消息
      for (String msg : news) {
            // 发布消息并获取接收者数量
            long receiverCount = jedis.publish(CHANNEL_KEY, msg);
            System.out.println(String.format(
                "【生产者】发布消息:%s,当前订阅者数量:%d",
                msg, receiverCount));
            // 模拟消息间隔
            Thread.sleep(1000);
      }
      
      // 关闭连接
      jedis.close();
    }
}
</pre></div>
<h5>消费者实现(订阅消息)</h5>
<div class="jb51code"><pre class="brush:sql;">import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PubSubConsumer {
    private static final String CHANNEL_KEY = "redis:mq:pubsub:news";
    private static final Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) {
      // 创建自定义的消息处理器
      JedisPubSub pubSub = new JedisPubSub() {
            // 接收到消息时的回调方法
            @Override
            public void onMessage(String channel, String message) {
                System.out.println(String.format(
                  "【消费者1】接收到新消息(频道:%s):%s",
                  channel, message));
                // 此处可添加业务处理逻辑
                // 例如:解析消息内容、写入数据库、触发其他操作等
            }

            // 成功订阅频道时的回调
            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println(String.format(
                  "【消费者1】成功订阅频道:%s,当前订阅总数:%d",
                  channel, subscribedChannels));
            }
            
            // 可添加其他回调方法如onUnsubscribe、onPSubscribe等
      };

      System.out.println("【消费者1】启动并开始监听...");
      // 开始订阅(该方法会阻塞当前线程)
      jedis.subscribe(pubSub, CHANNEL_KEY);
      
      // 注意:在实际应用中,通常会将订阅逻辑放在独立线程中
      // 以避免阻塞主线程
    }
}
</pre></div>
<p class="maodian"><a name="_label3_1_3_10"></a></p><h4>2.2 方案深度分析与应用场景</h4>
<h5>优点详解</h5>
<ol><li><strong>实时广播能力</strong>:天然支持一对多的消息分发,一条消息可以同时被多个消费者接收</li><li><strong>实现简单</strong>:无需额外中间件,使用Redis原生命令即可实现</li><li><strong>低延迟</strong>:消息发布后立即推送给所有订阅者,延迟通常在毫秒级</li><li><strong>动态扩展</strong>:消费者可以随时加入或退出订阅,系统自动处理连接管理</li></ol>
<h5>缺点与限制</h5>
<ul><li><strong>消息持久化问题</strong>:
<ul><li>Redis重启后所有未消费的消息都会丢失</li><li>消费者离线期间的消息无法恢复</li></ul></li><li><strong>可靠性限制</strong>:<ul><li>缺乏消息确认机制,无法保证消息必达</li><li>网络中断可能导致消息丢失</li></ul></li><li><strong>流量控制缺失</strong>:<ul><li>没有背压机制,生产者可能压垮消费者</li><li>无法限制消息堆积(因为根本不堆积)</li></ul></li></ul>
<h5>适用场景分析</h5>
<ul><li><strong>实时通知系统</strong>:
<ul><li>网站全局公告推送</li><li>在线聊天室消息分发</li><li>游戏服务器中的全服通知</li></ul></li><li><strong>日志收集与监控</strong>:<ul><li>多个监控系统同时接收相同的日志流</li><li>实时统计系统指标</li><li>分布式系统的调试信息广播</li></ul></li><li><strong>临时性事件广播</strong>:<ul><li>系统配置变更通知</li><li>缓存失效广播</li><li>服务注册中心的服务变更通知</li></ul></li><li><strong>不要求可靠性的场景</strong>:<ul><li>实时数据统计(允许少量数据丢失)</li><li>非关键业务的实时通知</li><li>辅助性的系统状态更新</li></ul></li></ul>
<h5>不适用场景</h5>
<ol><li>金融交易等要求消息100%可靠的系统</li><li>需要保证消息顺序的场景</li><li>需要消息重放或回溯的业务</li><li>消费者处理能力远低于生产者速率的场景</li></ol>
<p class="maodian"><a name="_label3_1_3_11"></a></p><h4>使用建议</h4>
<ul><li><strong>频道命名规范</strong>:建议采用<code>业务域:子系统:消息类型</code>的层次结构,如<code>trade:order:create</code></li><li><strong>消费者实现</strong>:<ul><li>为每个订阅者创建独立连接</li><li>将订阅逻辑放在独立线程中</li><li>实现重连机制处理网络中断</li></ul></li><li><strong>监控指标</strong>:<ul><li>跟踪每个频道的订阅者数量</li><li>监控消息发布速率</li><li>记录消息丢失情况(需应用层实现)</li></ul></li><li><strong>性能优化</strong>:<ul><li>对于高频消息,考虑消息聚合</li><li>大消息可考虑只发送引用ID</li><li>合理设置Redis的TCP-Keepalive参数</li></ul></li></ul>
<p class="maodian"><a name="_lab2_1_4"></a></p><h3>方案 3:基于 Stream 的可靠消息队列(Redis 5.0+)</h3>
<p>Redis 5.0 推出的 Stream 数据结构是专门为消息队列场景设计的,它完美解决了传统 List 和 Pub/Sub 模式的诸多缺陷。Stream 支持消息持久化存储、消息确认机制、消费者组管理、死信队列等企业级特性,是目前 Redis 实现可靠消息队列的最佳方案。在实际应用中,如电商订单处理、支付流水记录、日志收集等场景都能发挥重要作用。</p>
<p class="maodian"><a name="_label3_1_4_12"></a></p><h4>3.1 Stream 核心概念</h4>
<p><strong>Stream</strong>:消息队列的主体,每个 Stream 有唯一的 key(如&quot;order:stream&quot;)。消息以&quot;条目(Entry)&quot;形式存储,每个条目包含:</p>
<ul><li>唯一 ID:自动生成的格式为&quot;时间戳-序列号&quot;(如1680000000000-0)</li><li>多个字段值对:如{&quot;order_id&quot;:&quot;1001&quot;,&quot;amount&quot;:&quot;199.00&quot;}</li></ul>
<p><strong>消费者组(Consumer Group)</strong>:通过将多个消费者归为一组,实现:</p>
<ul><li>组内消费者共享消息,避免重复消费</li><li>自动负载均衡,消息均匀分配给各消费者</li><li>支持水平扩展,可随时增加消费者</li></ul>
<p><strong>消息确认(ACK)机制</strong>:</p>
<ol><li>消费者获取消息后,消息进入&quot;Pending&quot;状态</li><li>处理完成后需显式发送ACK命令</li><li>未确认的消息会在消费者断开后重新分配</li></ol>
<p><strong>Pending 列表</strong>:</p>
<ul><li>存储所有已获取但未确认的消息</li><li>记录每个消息的消费者名称、获取时间、重试次数</li><li>支持通过XPENDING命令查看待处理消息</li></ul>
<p><strong>死信队列</strong>:</p>
<ul><li>当消息超过最大重试次数(如3次)仍未处理成功</li><li>可自动/手动转移到专门设计的死信Stream</li><li>便于后续人工干预或特殊处理</li></ul>
<p class="maodian"><a name="_label3_1_4_13"></a></p><h4>3.2 核心命令详解</h4>
<h5>基本操作命令</h5>
<table><thead><tr><th>操作类型</th><th>命令格式</th><th>说明</th></tr></thead><tbody><tr><td>添加消息</td><td><code>XADD key * field1 value1 </code></td><td>*表示自动生成ID,可指定ID保证顺序</td></tr><tr><td>创建消费者组</td><td><code>XGROUP CREATE key groupname id </code></td><td>MKSTREAM选项在Stream不存在时自动创建</td></tr><tr><td>消费消息</td><td><code>XREADGROUP GROUP group consumer STREAMS key </code></td><td>id通常为&gt;表示新消息,0表示Pending消息</td></tr><tr><td>消息确认</td><td><code>XACK key groupname id </code></td><td>支持批量确认多个消息</td></tr><tr><td>查看Pending消息</td><td><code>XPENDING key groupname </code></td><td>可查看指定消费者的未确认消息</td></tr><tr><td>消息所有权转移</td><td><code>XCLAIM key groupname consumer min-idle-time id </code></td><td>将空闲超时的消息转给其他消费者处理</td></tr></tbody></table>
<h5>高级管理命令</h5>
<ol><li><strong>消息回溯</strong>:<code>XREAD STREAMS key 0-0</code>从最早消息开始读取</li><li><strong>范围查询</strong>:<code>XRANGE key start end </code>按ID范围查询</li><li><strong>监控命令</strong>:<code>XINFO GROUPS key</code>查看消费者组信息</li></ol>
<p class="maodian"><a name="_label3_1_4_14"></a></p><h4>3.3 代码实战(Java + Jedis)</h4>
<h5>1. 环境准备</h5>
<div class="jb51code"><pre class="brush:plain;">// Maven依赖
&lt;dependency&gt;
    &lt;groupId&gt;redis.clients&lt;/groupId&gt;
    &lt;artifactId&gt;jedis&lt;/artifactId&gt;
    &lt;version&gt;4.3.1&lt;/version&gt;
&lt;/dependency&gt;
// 连接配置
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
try (JedisPool pool = new JedisPool(config, "localhost", 6379)) {
    Jedis jedis = pool.getResource();
    // 业务代码...
}</pre></div>
<h5>2. 生产消费完整流程</h5>
<p><strong>生产者增强版</strong>:</p>
<div class="jb51code"><pre class="brush:java;">public class EnhancedProducer {
    private static final String[] ORDER_STATUS = {"PENDING", "PAID", "SHIPPED", "COMPLETED"};
    public void sendOrderEvent(Order order) {
      try (Jedis jedis = pool.getResource()) {
            Map&lt;String, String&gt; fields = new HashMap&lt;&gt;();
            fields.put("order_id", order.getId());
            fields.put("user_id", order.getUserId());
            fields.put("amount", order.getAmount().toString());
            fields.put("status", ORDER_STATUS);
            fields.put("create_time", Instant.now().toString());
            // 使用事务保证原子性
            Transaction t = jedis.multi();
            t.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, fields);
            t.sadd("order:ids", order.getId()); // 记录订单ID集合
            t.exec();
            // 添加监控埋点
            Metrics.counter("mq.produce.count").increment();
      }
    }
}</pre></div>
<p><strong>消费者增强版</strong>:</p>
<div class="jb51code"><pre class="brush:java;">public class ReliableConsumer implements Runnable {
    private static final int MAX_RETRY = 3;
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
            try {
                Map&lt;String, List&lt;StreamEntry&gt;&gt; messages = jedis.xreadGroup(
                  GROUP_NAME, consumerName,
                  new StreamParams().count(1).block(2000),
                  new StreamOffset(STREAM_KEY, "&gt;")
                );
                if (messages != null) {
                  messages.forEach((stream, entries) -&gt; {
                        entries.forEach(entry -&gt; {
                            processWithRetry(entry);
                        });
                  });
                }
            } catch (Exception e) {
                logger.error("消费异常", e);
                sleep(1000);
            }
      }
    }
    private void processWithRetry(StreamEntry entry) {
      int retryCount = getRetryCount(entry.getID());
      if (retryCount &gt;= MAX_RETRY) {
            moveToDeadLetter(entry);
            return;
      }
      try {
            Order order = parseOrder(entry.getFields());
            orderService.process(order);
            jedis.xack(STREAM_KEY, GROUP_NAME, entry.getID());
      } catch (Exception e) {
            logger.warn("处理失败准备重试", e);
            sleep(1000 * (retryCount + 1));
      }
    }
}</pre></div>
<h5>3. 死信队列管理</h5>
<div class="jb51code"><pre class="brush:java;">public class DeadLetterMonitor {
    public void checkPendingMessages() {
      // 获取所有超时未确认的消息
      List&lt;StreamEntry&gt; pending = getPendingMessages(TIMEOUT_MS);
      pending.forEach(entry -&gt; {
            // 检查重试次数
            if (getRetryCount(entry.getID()) &gt; MAX_RETRY) {
                // 转移到死信队列
                jedis.xadd(DEAD_STREAM_KEY, StreamEntryID.NEW_ENTRY, entry.getFields());
                jedis.xack(STREAM_KEY, GROUP_NAME, entry.getID());
                logger.warn("消息转入死信队列: {}", entry.getID());
                // 发送告警通知
                alertService.notifyAdmin(entry);
            }
      });
    }
    public void reprocessDeadLetters() {
      // 从死信队列重新处理
      List&lt;StreamEntry&gt; deadMessages = jedis.xrange(DEAD_STREAM_KEY, "-", "+");
      deadMessages.forEach(entry -&gt; {
            try {
                manualProcess(entry.getFields());
                jedis.xdel(DEAD_STREAM_KEY, entry.getID());
            } catch (Exception e) {
                logger.error("死信处理失败", e);
            }
      });
    }
}</pre></div>
<p class="maodian"><a name="_label3_1_4_15"></a></p><h4>3.4 最佳实践建议</h4>
<ul><li><strong>消费者设计原则</strong>:
<ul><li>每个消费者设置唯一标识</li><li>实现幂等性处理逻辑</li><li>添加合理的阻塞超时时间(通常1-5秒)</li></ul></li><li><strong>性能优化</strong>:</li></ul>
<div class="jb51code"><pre class="brush:sql;">// 批量消费提高吞吐量
jedis.xreadGroup(GROUP_NAME, consumerName,
    new StreamParams().count(100).block(1000),
    new StreamOffset(STREAM_KEY, "&gt;"));

// 批量确认减少网络开销
jedis.xack(STREAM_KEY, GROUP_NAME, id1, id2, id3);
</pre></div>
<ul><li><strong>监控指标</strong>:
<ul><li>待处理消息数(XPENDING)</li><li>消费者延迟(当前时间 - 消息创建时间)</li><li>死信队列大小</li><li>消费成功率</li></ul></li><li><strong>异常处理</strong>:</li></ul>
<div class="jb51code"><pre class="brush:java;">// 消费者崩溃后的恢复处理
public void recoverConsumer(String failedConsumer) {
    List&lt;PendingEntry&gt; pendings = jedis.xpending(
      STREAM_KEY, GROUP_NAME, "-", "+", 100, failedConsumer);
    pendings.forEach(pending -&gt; {
      jedis.xclaim(STREAM_KEY, GROUP_NAME, currentConsumer,
            TIMEOUT_MS, pending.getIdAsString());
    });
}</pre></div>
<p>通过以上实现,基于Redis Stream的消息队列可以达到:</p>
<ul><li>99.9%的消息可靠性</li><li>每秒万级的吞吐量</li><li>秒级的端到端延迟</li><li>完善的故障恢复机制</li></ul>
<p class="maodian"><a name="_label2"></a></p><h2>三、三种方案的选型对比与最佳实践</h2>
<p class="maodian"><a name="_lab2_2_5"></a></p><h3>3.1 方案选型对比表:</h3>
<table><thead><tr><th>对比维度</th><th>List 方案</th><th>Pub/Sub 方案</th><th>Stream 方案(推荐)</th></tr></thead><tbody><tr><td>消息持久化</td><td>支持(需手动处理)</td><td>不支持</td><td>原生支持</td></tr><tr><td>消息确认</td><td>需自定义(如RPOPLPUSH)</td><td>不支持</td><td>原生支持(ACK机制)</td></tr><tr><td>广播能力</td><td>不支持</td><td>原生支持(全量广播)</td><td>支持(通过多消费者组实现)</td></tr><tr><td>消费者负载均衡</td><td>支持(竞争消费模式)</td><td>不支持(全量推送)</td><td>支持(消费者组内自动均衡)</td></tr><tr><td>死信队列</td><td>需自定义(备份List)</td><td>不支持</td><td>支持(通过XCLAIM命令)</td></tr><tr><td>实现复杂度</td><td>低(基础命令即可)</td><td>低(订阅/发布模式)</td><td>中(需理解消费者组概念)</td></tr><tr><td>内存占用</td><td>线性增长</td><td>瞬时内存</td><td>可控制(支持消息修剪)</td></tr><tr><td>历史消息回溯</td><td>有限支持(需保存完整List)</td><td>不支持</td><td>完整支持(消息ID时间序列)</td></tr><tr><td>适用场景</td><td>简单异步通信</td><td>实时广播通知</td><td>可靠消息、企业级场景</td></tr></tbody></table>
<p class="maodian"><a name="_lab2_2_6"></a></p><h3>3.2 最佳实践建议</h3>
<ol><li><p>选型决策树:</p>
<ul><li>首要判断消息可靠性需求:<ul><li>必须保证不丢失 &rarr; 直接选择Stream</li><li>可接受偶尔丢失 &rarr; 进入下一判断</li></ul></li><li>次要判断消息分发模式:<ul><li>需要广播 &rarr; 选择Pub/Sub</li><li>点对点消费 &rarr; 选择List或Stream</li></ul></li><li>最后评估开发成本:<ul><li>快速实现 &rarr; 选择List</li><li>长期维护 &rarr; 选择Stream</li></ul></li></ul></li><li><p>Stream方案实施细节:</p>
<ul><li>消费者组创建示例:<div class="jb51code"><pre class="brush:sql;">XGROUP CREATE mystream mygroup $ MKSTREAM
</pre></div></li><li>典型消费代码逻辑:<ol><li>使用XREADGROUP阻塞读取</li><li>业务处理成功后发送XACK</li><li>处理失败时使用XCLAIM转移消息</li><li>设置合理的PEL(Pending Entries List)超时</li></ol></li></ul></li><li><p>List方案优化建议:</p>
<ul><li>可靠消费模式实现:<div class="jb51code"><pre class="brush:java;">RPOPLPUSH source_list backup_list# 原子操作
# 处理成功后再LREM备份列表</pre></div></li><li>性能提升技巧:<ul><li>批量生产:使用Pipeline打包多个LPUSH</li><li>批量消费:LUA脚本实现多消息批量获取</li></ul></li></ul></li><li><p>集群环境特别注意事项:</p>
<ul><li>跨slot访问问题:<ul><li>所有相关key必须使用相同hash tag(如{msg})</li><li>或者采用客户端分片路由</li></ul></li><li>监控重点指标:<ul><li>Stream方案的PEL积压长度</li><li>List方案的内存增长曲线</li><li>Pub/Sub的客户端连接数</li></ul></li></ul></li><li><p>运维管理建议:</p>
<ul><li>容量规划:<ul><li>按业务峰值QPS的1.5倍预留资源</li><li>Stream建议单分片不超过10MB/s写入</li></ul></li><li>监控告警:<ul><li>设置消息积压阈值(如Stream的PEL&gt;1000)</li><li>监控消费者延迟(XINFO GROUPS)</li></ul></li><li>灾备方案:<ul><li>定期备份Stream的RDB快照</li><li>对于关键业务实现双写机制</li></ul></li></ul></li></ol>
<p class="maodian"><a name="_label3"></a></p><h2>四、实际应用案例:电商订单异步处理</h2>
<p class="maodian"><a name="_lab2_3_7"></a></p><h3>4.1 业务流程详解</h3>
<p>电商平台的订单处理采用异步消息队列模式,通过Redis Stream实现可靠的消息传递和消费。整个流程包含以下关键环节:</p>
<ol><li><p><strong>订单创建阶段</strong></p>
<ul><li>用户下单后,订单服务作为生产者将订单数据持久化到MySQL数据库</li><li>同时将订单关键信息(订单ID、用户ID、商品ID、数量等)封装为消息,发送到名为&quot;order_create&quot;的Stream中</li><li>消息格式示例:<div class="jb51code"><pre class="brush:json;">{
"order_id": "ORD20231125001",
"user_id": "U10086",
"product_id": "P8808",
"quantity": "2"
}</pre></div></li></ul></li><li><p><strong>并行消费阶段</strong></p>
<ul><li><p><strong>通知服务(消费者1)</strong>:专门处理用户通知</p>
<ul><li>消费消息后调用短信平台API或极光推送服务</li><li>通知内容示例:&quot;尊敬的会员,您的订单ORD20231125001已创建成功,我们将尽快为您处理&quot;</li><li>支持重试机制:若首次发送失败,会按照指数退避策略重试3次</li></ul></li><li><p><strong>库存服务(消费者2)</strong>:负责库存扣减</p>
<ul><li>采用乐观锁机制更新库存:<code>UPDATE inventory SET stock = stock - ? WHERE product_id = ? AND stock &gt;= ?</code></li><li>实现分布式事务:若扣减失败会记录操作日志,便于后续人工核对</li></ul></li></ul></li><li><p><strong>异常处理机制</strong></p>
<ul><li>当库存扣减失败时,消息会进入Pending列表并设置5分钟超时</li><li>超时后自动转移到死信队列&quot;DLQ:order_create&quot;</li><li>运维人员通过管理后台查看死信队列,可:<ul><li>人工补扣库存</li><li>触发订单取消流程</li><li>联系用户协商处理</li></ul></li></ul></li></ol>
<p class="maodian"><a name="_lab2_3_8"></a></p><h3>4.2 核心代码实现(生产级优化版)</h3>
<p class="maodian"><a name="_label3_3_8_16"></a></p><h4>订单服务(生产者)增强实现</h4>
<div class="jb51code"><pre class="brush:java;">// 订单服务(生产者)发送消息 - 增强版
public void createOrder(Order order) {
    // 1. 数据库事务确保数据一致性
    TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
    try {
      // 1.1 保存主订单
      orderMapper.insert(order);
      // 1.2 保存订单明细
      order.getItems().forEach(item -&gt; {
            item.setOrderId(order.getId());
            orderItemMapper.insert(item);
      });
      // 2. 构建消息体(添加时间戳和业务标识)
      Map&lt;String, String&gt; message = new HashMap&lt;&gt;();
      message.put("order_id", order.getId());
      message.put("user_id", order.getUserId());
      message.put("product_id", order.getProductId());
      message.put("quantity", order.getQuantity() + "");
      message.put("create_time", System.currentTimeMillis() + "");
      message.put("biz_type", "NORMAL_ORDER");
      // 3. 发送消息(添加重试机制)
      int retryTimes = 0;
      while (retryTimes &lt; 3) {
            try {
                jedis.xadd("redis:mq:stream:order_create", null, message);
                break;
            } catch (Exception e) {
                retryTimes++;
                if (retryTimes == 3) {
                  throw new RuntimeException("消息发送失败", e);
                }
                Thread.sleep(1000 * retryTimes);
            }
      }
      transactionManager.commit(status);
    } catch (Exception e) {
      transactionManager.rollback(status);
      throw new BusinessException("订单创建失败", e);
    }
}</pre></div>
<p class="maodian"><a name="_label3_3_8_17"></a></p><h4>通知服务(消费者)完整实现</h4>
<div class="jb51code"><pre class="brush:java;">// 通知服务(消费者)完整实现
public void handleNotification() {
    // 初始化消费者组(幂等操作)
    initConsumerGroup("redis:mq:stream:order_create", "order_group");
    while (!Thread.currentThread().isInterrupted()) {
      try {
            Map&lt;String, List&lt;StreamEntry&gt;&gt; messages = jedis.xreadGroup(
                "order_group",
                "notify_consumer_" + instanceId, // 使用实例ID区分消费者
                1,
                5000,
                false,
                Map.of("redis:mq:stream:order_create", StreamEntryID.UNRECEIVED_ENTRY)
            );
            if (messages != null &amp;&amp; !messages.isEmpty()) {
                for (StreamEntry entry : messages.get("redis:mq:stream:order_create")) {
                  Map&lt;String, String&gt; content = entry.getFields();
                  String userId = content.get("user_id");
                  String orderId = content.get("order_id");
                  // 1. 发送短信(带熔断机制)
                  boolean smsSent = circuitBreaker.execute(() -&gt;
                        smsService.send(userId, "订单通知", "您的订单" + orderId + "已创建成功"));
                  // 2. 发送APP推送
                  boolean pushSent = pushService.send(userId, "订单创建通知",
                        Map.of("orderId", orderId, "type", "order_created"));
                  if (smsSent || pushSent) {
                        // 至少一个通知发送成功才确认消息
                        jedis.xack("redis:mq:stream:order_create", "order_group", entry.getID());
                        monitorService.recordSuccess("order_notify");
                  } else {
                        monitorService.recordFailure("order_notify");
                  }
                }
            }
      } catch (Exception e) {
            log.error("通知处理异常", e);
            monitorService.recordError("order_notify", e);
            Thread.sleep(5000); // 异常休眠避免循环异常
      }
    }
}
private void initConsumerGroup(String streamKey, String groupName) {
    try {
      jedis.xgroupCreate(streamKey, groupName, StreamEntryID.LAST_ENTRY, true);
    } catch (RedisBusyException e) {
      log.info("消费者组已存在: {}", groupName);
    }
}</pre></div>
<p class="maodian"><a name="_label3_3_8_18"></a></p><h4>库存服务(消费者)完整实现</h4>
<div class="jb51code"><pre class="brush:java;">// 库存服务(消费者)完整实现
public void handleInventory() {
    // 初始化消费者组
    initConsumerGroup("redis:mq:stream:order_create", "order_group");
    while (!Thread.currentThread().isInterrupted()) {
      try {
            Map&lt;String, List&lt;StreamEntry&gt;&gt; messages = jedis.xreadGroup(
                "order_group",
                "inventory_consumer_" + instanceId,
                1,
                5000,
                false,
                Map.of("redis:mq:stream:order_create", StreamEntryID.UNRECEIVED_ENTRY)
            );
            if (messages != null &amp;&amp; !messages.isEmpty()) {
                for (StreamEntry entry : messages.get("redis:mq:stream:order_create")) {
                  Map&lt;String, String&gt; content = entry.getFields();
                  String productId = content.get("product_id");
                  int quantity = Integer.parseInt(content.get("quantity"));
                  String orderId = content.get("order_id");
                  // 1. 扣减库存(带事务)
                  boolean success = inventoryService.deductWithLog(
                        productId,
                        quantity,
                        orderId,
                        "ORDER_DEDUCTION"
                  );
                  if (success) {
                        // 2. 确认消息
                        jedis.xack("redis:mq:stream:order_create", "order_group", entry.getID());
                        monitorService.recordSuccess("inventory_deduct");
                  } else {
                        // 3. 记录失败日志
                        log.warn("库存扣减失败 orderId={}, productId={}", orderId, productId);
                        monitorService.recordFailure("inventory_deduct");
                        // 不确认消息,让其进入Pending状态
                  }
                }
            }
      } catch (Exception e) {
            log.error("库存处理异常", e);
            monitorService.recordError("inventory_deduct", e);
            Thread.sleep(5000);
      }
    }
}
// 库存扣减服务方法
@Transactional
public boolean deductWithLog(String productId, int quantity, String bizId, String bizType) {
    // 1. 扣减库存
    int affected = inventoryMapper.deductWithVersion(
      productId,
      quantity,
      getCurrentVersion(productId)
    );
    if (affected == 0) {
      return false;
    }
    // 2. 记录操作流水
    InventoryLog log = new InventoryLog();
    log.setLogId(UUID.randomUUID().toString());
    log.setProductId(productId);
    log.setChangedAmount(-quantity);
    log.setBizId(bizId);
    log.setBizType(bizType);
    log.setRemarks("订单扣减");
    inventoryLogMapper.insert(log);
    return true;
}</pre></div>
<p class="maodian"><a name="_lab2_3_9"></a></p><h3>4.3 监控与运维设计</h3>
<ol><li><p><strong>监控指标</strong></p>
<ul><li>消息堆积量:<code>XLEN redis:mq:stream:order_create</code></li><li>Pending列表数量:<code>XPENDING redis:mq:stream:order_create order_group</code></li><li>消费者延迟:通过消息时间戳与当前时间差值计算</li></ul></li><li><p><strong>运维命令示例</strong></p>
<div class="jb51code"><pre class="brush:java;"># 查看消费者组信息
XINFO GROUPS redis:mq:stream:order_create
# 处理死信消息
XRANGE DLQ:order_create - + COUNT 10
XACK DLQ:order_create manual_group &lt;entry_id&gt;</pre></div></li><li><p><strong>自动恢复方案</strong></p>
<ul><li>定时任务每小时检查Pending列表</li><li>对于超时1小时未处理的消息:<ul><li>尝试重新投递到原Stream</li><li>超过3次重试则转入死信队列</li><li>触发企业微信告警通知运维人员</li></ul></li></ul></li></ol>
頁: [1]
查看完整版本: Redis 实现消息队列实际案例