笔者使用的是 Spring 相关 API 实现对 Redis 指令的调用。首先实现消息的生产代码,封装到一个工具类方法当中。这里很简单,就是调用了 lpush 方法,将序列化的 key 和 value 添加到列表当中去。
代码实现队列消息消费者因为实现队列消费消息的代码比较多,不可能每个需要阻塞消费的地方,对需要写这一坨代码,因此使用 Java8 的函数式接口实现方法的传递,同时阻塞式获取消息代码使用新线程去执行。
有人看到以下代码要吐槽了,不是说不用 while(true) 吗,怎么你这里面还是有,这里稍微解释一下,因为 SpringBoot 一般会指定 timeout 的全局超时时间,即使 BRPOPLPUSH 设置了 0,即无限期,当超出了 timeout 设置的值时,就会抛出 QueryTimeoutException 异常导致线程退出,因此添加了 try/catch 对异常进行捕获并忽略,同时使用 while(true) 保证线程可以继续执行。 代码中记录了当前消息处理结果,如果处理结果为成功,需要对备份队列的当前消息进行删除。
测试代码 项目使用方式为了方便使用,我将其抽取为了一个工具类,使用时通过 Spring 注入使用即可, 队列消费可以使用如下方式在项目启动的时候就进行阻塞监听队列,等待消费
本文完整代码下载github 地址
到此这篇关于基于Redis实现阻塞队列的文章就介绍到这了,更多相关Redis阻塞队列内容请搜索七叶笔记以前的文章或继续浏览下面的相关文章希望大家以后多多支持七叶笔记!