Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。
基于redis实现消息队列的方式有很多:
PUB/SUB,订阅/发布模式基于List的 LPUSH+BRPOP 的实现 redis 实现消息对列4中方法 发布订阅发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。
发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。
使用场景:微服务间的消息同步,如 分布式webSocker,数据同步等。
list 队列生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。
**list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。
**list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。
zset 队列生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。
zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。
zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加锁或其他方式解决消费的幂等性。
zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue
Stream 队列Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。
参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。
提供消息ack机制。
基本命令 xadd 生产消息往 stream 内创建消息 语法为:
XADD key ID field string [field string …]
读取消息读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
XRANGE key startID endID count
xgroup 消费者组redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。
每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9
创建消费者组:
xreadgroup 消费消息通过xreadgroup可以在消费者组内创建消费者消费消息
XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
Pending 等待列表通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id
每个Pending的消息有4个属性:
消息ID所属消费者IDLE,已读取时长delivery counter,消息被读取次数XPENDING key group [start end count] [consumer]
消息确认当消费者消费了消息,需要通过 xack 命令确认消息,xack后的消息会从pending列表移除
XACK key gruopName ID
消息转移当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLAIM 将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。
通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。
信息监控redis提供了xinfo来查看stream的信息
SpringBoot 整合1 引入依赖
2 编写消费者
3 配置redis
序列化配置
消费者组和消费者配置
4.生产者生产消息
参考文档:redis Stream 消息队列
SpringBoot整合redis stream 实现消息队列
到此这篇关于redis stream 实现消息队列的实践的文章就介绍到这了,更多相关redis stream 消息队列内容请搜索七叶笔记以前的文章或继续浏览下面的相关文章希望大家以后多多支持七叶笔记!