七叶笔记 » golang编程 » Rabbitmq实战golang实现

Rabbitmq实战golang实现

原文地址:

原文作者:Gopherzhang

1. Rabbitmq 架构及原理

消息队列,又叫做消息中间件。是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信(维基百科)

MQ 的作用:

  1. 实现异步通信
  2. 系统解耦
  3. 流量削峰

MQ 带来的问题:

  1. 引入消息队列带来的延迟问题
  2. 增加了系统的复杂度
  3. 可能产生数据不一致的问题

消息丢失和消息重复消费的问题。一旦消息没有被正确地消费,就会带来数据不一致性的问题。

RabbitMQ 是一个流行的开源消息队列系统,是AMQP(高级消息队列协议)标准的实现。

关于AMQP 协议具体文档参考

由以高性能、健壮、可伸缩性出名的Erlang语言开发,并继承了这些优点。rabbitmq简单架构如下:

  • Broker(代理/中介): RabbitMQ 用于收发消息的服务,默认是 5672 的端口。
  • Virtual Host(vhost):虚拟主机;标识一批交换机、消息队列和相关对象。

虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。
  • Banding:绑定,用于消息队列和交换机之间的关联。

一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。

直接创建和释放 TCP 长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,因为 TCP 连接是非常宝贵的资源,创建和释放也要消耗时间。所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接

  • Connection:无论是生产者发送消息,还是消费者接收消息,都必须要跟 Broker 之间建立一个连接,这个连接是一个 TCP 的长连接。

2. RabbitMQ的六种工作模式:

官方网站有详细示意图

  1. simple简单模式

  1. work工作模式

  1. Publish/Subscribe 发布订阅模式(fanout)

  1. Routing 路由模式 (direct)

  1. Topics 主题模式(路由模式的一种)(topic)

  1. RPC

  1. Publisher Confirms 发布确认

具体demo参考如下地址:

go操作RabbitMQ

3. 延迟队列实现(基于 死信 队列转发)

3.1 消息过期时间:

有两种设置方式:

  1. 通过队列属性设置消息过期时间,所有队列中的消息超过时间未被消费时,都会过期。
 _, err := r.channel.QueueDeclare(
queueName,
true,
false,
false, // 队列解锁
false,
amqp.Table{
"x-message-ttl": 4000, // 在队列中声明ttl 超时时间 单位为毫秒级,类型为int
},
)  
  1. 设置单条消息的过期时间,在发送消息的时候指定消息属性(推荐使用消息超时)。
 expiration := "4000" // 4S 4000MS
err = r.channel.Publish(
"",
queueName,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body:        []byte(message),
Expiration:  expiration, // push 时 在消息本体上设置expiration超时时间,单位为毫秒级别 类型为 string
})  

3.2 死信 队列:

消息在某些情况下会变成死信(Dead Letter)

队列在创建的时候可以指定一个死信交换机 DLX(Dead Letter Exchange)。

死信交换机绑定的队列被称为死信队列 DLQ(Dead Letter Queue),DLX 实际上也是普通的交换机,DLQ 也是普通的队列。

三种情况会让消息变成死信:

  • 消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue== false
  • 消息过期
  • 队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes(字节数),最先入队的消息会被发送到 DLX。 > 关于这个队里的默认长度,官方没有给出,网上找了下有说是没有设置就动态增长不限。也就是根据你机器的配置情况了。

死信队列声明如下:

 _, err := r.channel.QueueDeclare(
queueName, // 这里就是将一个队列声明为如下死信交换机的死信队列
true,
false,
false, 
false,
amqp.Table{
"x-dead-letter-exchange":    dlxExchange, // 声明当前队列绑定的 死信交换机
},
)  

3.3 延迟队列demo:

pusher:

 func (r *RabbitMQ) PublishDelayQueue(queue, message, dlxExchange, routing, expiration string) error {
defer r.CloseMq()
queueName := queue + "_delay"
_, err := r.channel.QueueDeclare(
queueName,
true,
false,
false, // 队列解锁
false,
amqp.Table{
"x-dead-letter-exchange":    dlxExchange, // 声明当前队列绑定的 死信交换机
"x-dead-letter-routing-key": routing,     // routing 模式路由名
},
)
if err != nil {
return err
}

// 注入消息 注册路由 routingKey
err = r.channel.Publish(
"",
queueName,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body:        []byte(message),
Expiration:  expiration,
})
if err != nil {
return err
}

fmt.Printf("push messag %s\n", message)
return nil
}  

Consumer:

 func (r *RabbitMQ) ConsumeDelayQueue(queueName, dlxExchange, routing string, f func(interface{})) error {
defer r.CloseMq()
err := r.channel.ExchangeDeclare(
dlxExchange,
RoutingKind, // 交换机类型 路由模式接收
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}

// 声明 死信队列(用于与死信交换机绑定)
q, err := r.channel.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}

// 绑定队列到 exchange 中
err = r.channel.QueueBind(
q.Name,
routing,
dlxExchange,
false,
nil)
if err != nil {
return err
}

// 消费消息
data, err := r.channel.Consume(
q.Name,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
return err
}

forever := make(chan bool)
go func() {
for d := range data {
fmt.Printf("Received a message: %s\n", d.Body)
f(d.Body)
}
}()
<-forever
return nil
}  

参考:

RabbitMQ工作模型与基本原理

rabbitmq消息队列原理

go操作RabbitMQ

golang mq rabbitmq

原文地址:

原文作者:Gopherzhang

相关文章