七叶笔记 » 数据库 » Redis Stream类型的使用详解

Redis Stream类型的使用详解

一、背景

最近在看redis这方面的知识,发现在redis5中产生了一种新的数据类型Stream,它和kafka的设计有些类似,可以当作一个简单的消息队列来使用。

二、redis中Stream类型的特点 是可持久化的,可以保证数据不丢失。 支持消息的多播、分组消费。 支持消息的有序性。

三、Stream的结构

解释:

消费者组: Consumer Group,即使用 XGROUP CREATE 命令创建的,一个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。

同一条消息,只能被这个消费者组中的某个消费者获取。 多个消费者之间是相互独立的,互不干扰。

消费者: Consumer 消费消息。

last_delivered_id: 这个id保证了在同一个消费者组中,一个消息只能被一个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个last_delivered_id的值会往后移动一位,保证消费者不会读取到重复的消息。

pending_ids:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调用ack命令。这样就确保了某个消息一定会被执行一次。

消息内容:是一个键值对的格式。

Stream 中 消息的 ID: 默认情况下,ID使用 * ,redis可以自动生成一个,格式为 时间戳-序列号,也可以自己指定,一般使用默认生成的即可,且后生成的id号要比之前生成的大。

四、Stream的命令

1、XADD 往Stream末尾添加消息

1、命令格式

2、举例

xadd 命令 返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第几条消息)

1、向流中增加一条数据,

2、向流中增加数据,不自动创建流

3、手动指定ID的值

4、设置一个固定大小的Stream1、精确指定Stream的大小

指定指定Stream的大小比模糊指定Stream的大小会稍微多少消耗一些性能。

2、模糊指定Stream的大小

~ 模糊指定流的大小,可以看到指定的是1,实际上已经到了3.

2、XRANGE查看Stream中的消息 1、命令格式

2、准备数据

使用redis的事务操作,获取到同一毫秒产生的多条数据,时间戳一样,序列号不一样

3、举例

1、获取所有的数据(-和+的使用)

-: 表示最小id的值

+:表示最大id的值

2、获取指定id范围内的数据,闭区间

3、获取指定id范围内的数据,开区间

(:表示开区间

4、获取某个毫秒后所有的数据

直接写毫秒不写后面的序列号即可。

5、获取单条数据

start和end的值写的一样即可获取单挑数据。

6、获取固定条数的数据

使用 count进行限制

3、XREVRANGE反向查看Stream中的消息

使用方式和XRANGE类似,略。

4、XDEL删除消息 1、命令格式 2、准备数据 3、举例

需求:往Stream中加入3条消息,然后删除第2条消息

注意:

需要注意的是,我们从Stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。

5、XLEN查看Stream中元素的长度 1、命令格式 2、举例

查看Stream中元素的长度

注意:

如果xlen后方的key不存在则返回0,否则返回元素的个数。

6、XTRIM对Stream中的元素进行修剪 1、命令格式 2、准备数据 3、举例

1、maxlen精确限制

上方的意思是,保留stream-key这个Stream中最后的2个消息。

2、minid模糊限制

minid 是删除比这个id小的数据,本地测试的时候没有测试出来,略。

7、XREAD独立消费消息

XREAD只是读取消息,读取完之后并不会删除消息。 使用XREAD读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。

1、命令格式

2、准备数据 3、举例

1、获取用户名是wangwu的数据

2、获取2条数据

count限制单次读取最后的消息,因为当前读取可能没有这么多。

3、非阻塞读取Stream对尾的数据

即读取队列尾的下一个消息,在非阻塞模式下始终是nil

4、阻塞读取Stream对尾的数据

注意:

$表示读取队列最新进来的一个消息,不是Stream的最后一个消息。是xread block执行后,再次使用xadd添加消息后,xread block才会返回。 block 0表示永久阻塞,当消息到来时,才接触阻塞。block 1000表示阻塞1000ms,如果1000ms还没有消息到来,则返回nil xread进行顺序消费 当使用xread进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去。 xread读取消息,完全无视消费组,此时Stream就可以理解为一个普通的list。

8、消费者组相关操作

1、消费者组命令

2、准备数据

1、创建Stream的名称是 stream-key

2、创建2个消息,aa和bb

3、创建消费者组

1、创建一个从头开始消费的消费者组

2、创建一个从Stream最新的一个消息消费的消费者组

$表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。

4、创建一个从某个消息之后消费的消费者组

1636362619125-0某个消息的具体的ID,这个g3消费者组中的消息都是大于>这个id的消息。

3、从消费者中读取消息

4、读取消费者的pending消息

5、转移消费者的消息

也可以通过xautoclaim来实现。

6、一些监控命令

1、查看消费组中消费者的pending消息

2、查看消费组中的消费者信息

3、查看消费组信息

4、查看Stream信息

五、参考文档

1、https://redis.io/topics/streams-intro

2、https://www.runoob.com/redis/redis-stream.html

到此这篇关于Redis Stream类型的使用详解的文章就介绍到这了,更多相关Redis Stream类型内容请搜索七叶笔记以前的文章或继续浏览下面的相关文章希望大家以后多多支持七叶笔记!

相关文章