使用Redis实现轻量级消息队列
<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">方式一 Redis Pub/Sub(适用于广播通知)</a></li><li><a href="#_label1">方式二:Redis List(适用于任务队列)</a></li><li><a href="#_label2">方式三:Redis Stream(推荐,支持持久化 + 消费组)</a></li><li><a href="#_label3">总结</a></li></ul></div><p>使用消息中间件如RabbitMQ或kafka虽然好,但也给服务器带来很大的内存开销,当系统的业务量,并发量不高时,考虑到服务器和维护成本,可考虑使用Redis实现一个轻量级的消息队列,实现事件监听的效果。下面介绍下Redis实现消息队列的三种形式。</p><p class="maodian"><a name="_label0"></a></p><h2>方式一 Redis Pub/Sub(适用于广播通知)</h2>
<p>Redis <strong>Pub/Sub</strong> 适用于 <strong>实时消息推送</strong>,但<strong>不支持消息持久化</strong>,如果消费者掉线,消息会丢失。</p>
<p>(1) 发布消息(生产者)</p>
<div class="jb51code"><pre class="brush:go;">import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
private final StringRedisTemplate redisTemplate;
public OrderService(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void createOrder(Long orderId) {
System.out.println("订单创建成功: " + orderId);
// 发布消息
redisTemplate.convertAndSend("order.channel", orderId.toString());
}
}
</pre></div>
<p>(2) 订阅消息(消费者)</p>
<div class="jb51code"><pre class="brush:go;">import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class NotificationService implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String orderId = message.toString();
System.out.println("【通知服务】收到订单创建消息:" + orderId);
}
}
</pre></div>
<p>(3) 注册 Redis 监听器</p>
<div class="jb51code"><pre class="brush:go;">import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisPubSubConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("order.channel"));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(NotificationService receiver) {
return new MessageListenerAdapter(receiver, "onMessage");
}
}
</pre></div>
<p><strong>缺点</strong></p>
<ul><li><strong>无持久化</strong>,消费者掉线后无法重新获取消息。</li><li><strong>不支持消费组</strong>,多个消费者同时订阅时,所有都会收到消息(无法负载均衡)。</li></ul>
<p class="maodian"><a name="_label1"></a></p><h2>方式二:Redis List(适用于任务队列)</h2>
<p>使用 Redis <strong>List</strong>(<code>LPUSH</code> + <code>RPOP</code>)可以实现简单的任务队列,适用于<strong>任务异步处理</strong>,但不支持回溯消费。</p>
<p><strong>(1) 生产者(推送任务)</strong></p>
<div class="jb51code"><pre class="brush:go;">import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
private final StringRedisTemplate redisTemplate;
public OrderService(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void createOrder(Long orderId) {
System.out.println("订单创建成功: " + orderId);
// 推送到队列
redisTemplate.opsForList().leftPush("order.queue", orderId.toString());
}
}
</pre></div>
<p>(2) 消费者(轮询获取任务)</p>
<div class="jb51code"><pre class="brush:go;">import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class NotificationService {
private final StringRedisTemplate redisTemplate;
public NotificationService(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Scheduled(fixedDelay = 5000) // 每5秒轮询一次
public void processOrderQueue() {
String orderId = redisTemplate.opsForList().rightPop("order.queue");
if (orderId != null) {
System.out.println("【通知服务】处理订单:" + orderId);
}
}
}
</pre></div>
<p>要想消费者能监听到消息并进行处理,需要在方法上添加@Scheduled注解,同时在服务启动类中添加@EnableScheduling注解,或者在配置类添加</p>
<div class="jb51code"><pre class="brush:go;">import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableScheduling
public class SchedulingConfig {
}</pre></div>
<p><strong>缺点</strong></p>
<ul><li><strong>无消费组</strong>,多个消费者时需要自行分配任务,可能会造成任务重复消费或丢失。</li><li><strong>无持久化保障</strong>,如果任务未处理完,Redis 发生故障,任务可能会丢失。</li></ul>
<p class="maodian"><a name="_label2"></a></p><h2>方式三:Redis Stream(推荐,支持持久化 + 消费组)</h2>
<p>Redis <strong>Stream</strong> 是 Redis 6.0 之后的特性,类似于 Kafka,支持<strong>持久化、消费组、多消费者模式</strong>。</p>
<p><strong>(1) 生产者(推送事件)</strong></p>
<div class="jb51code"><pre class="brush:go;">import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
private final StringRedisTemplate redisTemplate;
public OrderService(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void createOrder(Long orderId) {
System.out.println("订单创建成功: " + orderId);
// 推送到 Redis Stream
ObjectRecord<String, String> record = StreamRecords.newRecord()
.ofObject(orderId.toString())
.withStreamKey("order.stream");
redisTemplate.opsForStream().add(record);
}
}
</pre></div>
<p>(2) 消费者(监听事件)</p>
<div class="jb51code"><pre class="brush:go;">import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
@Service
public class NotificationService implements StreamListener<String, MapRecord<String, String, String>> {
private final StringRedisTemplate redisTemplate;
private final StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer;
public NotificationService(StringRedisTemplate redisTemplate,
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer) {
this.redisTemplate = redisTemplate;
this.listenerContainer = listenerContainer;
}
@PostConstruct
public void startListening() {
listenerContainer.receive(StreamOffset.fromStart("order.stream"), this);
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
String orderId = message.getValue().values().iterator().next();
System.out.println("【通知服务】订单 " + orderId + " 创建成功!");
}
}
</pre></div>
<p><strong>优点</strong></p>
<ul><li><strong>持久化存储</strong>,即使 Redis 重启,消息不会丢失。</li><li><strong>支持消费组</strong>,多个消费者可以<strong>负载均衡</strong>地消费消息。</li><li><strong>支持回溯</strong>,可以读取历史消息。</li></ul>
<p class="maodian"><a name="_label3"></a></p><h2>总结</h2>
<table><thead><tr><th>方案</th><th>适用场景</th><th>优点</th><th>缺点</th></tr></thead><tbody><tr><td><strong>Pub/Sub</strong></td><td>即时消息通知</td><td>低延迟</td><td>无持久化,消费者掉线丢消息</td></tr><tr><td><strong>List</strong></td><td>简单任务队列</td><td>轻量级</td><td>无消费组,任务可能丢失</td></tr><tr><td><strong>Stream</strong></td><td>高级事件流处理</td><td>持久化、消费组</td><td>复杂度较高</td></tr></tbody></table>
<p>如果需求是<strong>轻量级队列</strong>,推荐 <strong>Redis Stream</strong>,它类似 Kafka,支持消费组和持久化,比 Redis List 更稳定。</p>
頁:
[1]