七叶笔记 » golang编程 » RabbitMQ Golang教程(一)

RabbitMQ Golang教程(一)

RabbitMQ Golang教程(一)


介绍


RabbitMQ是一个消息代理:它接受并转发消息。你可以把它想象成一个邮局:当你把你想要邮寄的邮件放进一个邮箱时,你可以确定邮差先生或女士最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。

RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息。

RabbitMQ和一般的消息传递都使用一些术语。

生产仅意味着发送。发送消息的程序是生产者:

队列是位于RabbitMQ内部的邮箱的名称。尽管消息通过RabbitMQ和你的应用程序流动,但它们只能存储在队列中。队列只受主机内存和磁盘限制的限制,实际上它是一个大的消息缓冲区。许多生产者可以向一个队列发送消息,而许多消费者可以尝试从一个队列接收数据。以下是我们表示队列的方式:

消费与接收具有相似的含义。消费者是一个主要等待接收消息的程序:

请注意,生产者,消费者和代理(broker)不必位于同一主机上。一个应用程序既可以是生产者,也可以是消费者。


  • 首先,使用go get安装amqp
 go get github.com/streadway/amqp  

发送


我们将消息发布者(发送者)称为 send.go,将消息消费者(接收者)称为receive.go。发布者将连接到RabbitMQ,发送一条消息,然后退出。

 package main

import (
"github.com/streadway/amqp"
"log"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main()  {
conn,err := amqp.Dial("amqp://admin:admin@xx.xxx.xxx.xxx:5672/")

failOnError(err,"Failed to connect to RabbitMQ")
defer conn.Close()

ch,err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"hello", // name
false,   // durable
false,   // delete when unused
false,   // exclusive
false,   // no-wait
nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

body := "Hello World!"

err = ch.Publish(
"",     // exchange
q.Name, // routing key
false,  // mandatory
false,  // immediate
amqp.Publishing{
ContentType: "text/plain",
Body:        []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
  

接收


上面是我们的发布者。我们的消费者监听来自RabbitMQ的消息,因此与发布单个消息的发布者不同,我们将使消费者保持运行状态以监听消息并打印出来。

该代码(在receive.go中)具有与send相同的导入和帮助功能:

 package main

import (
"github.com/streadway/amqp"
"log"
)

func failOnErrors(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main()  {
conn,err := amqp.Dial("amqp://admin:admin@xx.xxx.xxx.xxx:5672/")
failOnErrors(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnErrors(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"hello", // name
false,   // durable
false,   // delete when unused
false,   // exclusive
false,   // no-wait
nil,     // arguments
)
failOnErrors(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"",     // consumer
true,   // auto-ack
false,  // exclusive
false,  // no-local
false,  // no-wait
nil,    // args
)
failOnErrors(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

}
  

现在我们可以运行两个脚本。在一个终端窗口,运行发布者:

 go run send.go
  

然后,运行使用者:

 go run receive.go
  

再打开rabbitMQ发现已经上线了

消费者将打印通过RabbitMQ从发布者那里得到的消息。使用者将持续运行,等待消息(使用Ctrl-C停止它),因此请尝试从另一个终端运行发布者。

打开浏览器可以看到队列已存在

如果要检查队列,请尝试使用rabbitmqctl list_queues命令。

 rabbitmqctl list_queues
  

相关文章