有经验的永州网站建设做网站怎么优化
下面这段教程针对是你已经有一些基本的MQ的知识,比如说能够很清楚的理解queue、exchange等概念,如果你还不是很理解,我建议你先访问官网查看基本的教程。
文章目录
- 1、造成死信队列的主要原因
- 2、操作逻辑图
- 3、代码实战
- 3.1 针对原因1:消费者超出时间未应答
- 3.3 针对原因2:限制一定的长度
- 3.3 针对原因3:消费者拒绝的消息回到死信队列中
1、造成死信队列的主要原因
- 消费者超时未应答
- 队列的容量有限
- 消费者拒绝了的消息
2、操作逻辑图
3、代码实战
其实整体的思路就是分别创建一个normal_exchange、dead_exchange、normal_queue、dead_queue,然后将normal_exchange与normal_queue进行绑定,将dead_exchange与dead_queue进行绑定,这里比较关键的一个点在于说如何将normal_queue与dead_exchange进行绑定,这样才能将错误的消息传递过来。下面就是这段代码的关键。
// 声明一个normal队列_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{//"x-message-ttl": 5000, // 指定过期时间//"x-max-length": 6, // 指定长度。超过这个长度的消息会发送到dead_exchange中"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key})
3.1 针对原因1:消费者超出时间未应答
consumer1.go
package day07import (amqp "github.com/rabbitmq/amqp091-go""log""v1/utils"
)type Constant struct {NormalExchange stringDeadExchange stringNormalQueue stringDeadQueue stringNormalRoutingKey stringDeadRoutingKey string
}func Consumer1() {// 获取连接ch := utils.GetChannel()// 创建一个变量常量constant := Constant{NormalExchange: "normal_exchange",DeadExchange: "dead_exchange",NormalQueue: "normal_queue",DeadQueue: "dead_queue",NormalRoutingKey: "normal_key",DeadRoutingKey: "dead_key",}// 声明normal交换机err := ch.ExchangeDeclare(constant.NormalExchange,amqp.ExchangeDirect,true,false,false,false,nil,)utils.FailOnError(err, "Failed to declare a normal exchange")// 声明一个dead交换机err = ch.ExchangeDeclare(constant.DeadExchange,amqp.ExchangeDirect,true,false,false,false,nil,)utils.FailOnError(err, "Failed to declare a dead exchange")// 声明一个normal队列_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{"x-message-ttl": 5000, // 指定过期时间//"x-max-length": 6,"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key})utils.FailOnError(err, "Failed to declare a normal queue")// 声明一个dead队列:注意不要给死信队列设置消息时间,否者死信队列里面的信息会再次过期_, err = ch.QueueDeclare(constant.DeadQueue,true,false,false,false,nil)utils.FailOnError(err, "Failed to declare a dead queue")// 将normal_exchange与normal_queue进行绑定err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue")// 将dead_exchange与dead_queue进行绑定err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")// 消费消息msgs, err := ch.Consume(constant.NormalQueue,"",false, // 这个地方一定要关闭自动应答false,false,false,nil)utils.FailOnError(err, "Failed to consume in Consumer1")var forever chan struct{}go func() {for d := range msgs {if err := d.Reject(false); err != nil {utils.FailOnError(err, "Failed to Reject a message")}}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}
consumer2.go
package day07import (amqp "github.com/rabbitmq/amqp091-go""log""v1/utils"
)func Consumer2() {// 拿取信道ch := utils.GetChannel()// 声明一个交换机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil)utils.FailOnError(err, "Failed to Declare a exchange")// 接收消息的应答msgs, err := ch.Consume("dead_queue","",false,false,false,false,nil,)var forever chan struct{}go func() {for d := range msgs {log.Printf("[x] %s", d.Body)// 开启手动应答ßd.Ack(false)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever}
produce.go
package day07import ("context"amqp "github.com/rabbitmq/amqp091-go""strconv""time""v1/utils"
)func Produce() {// 获取信道ch := utils.GetChannel()// 声明一个交换机err := ch.ExchangeDeclare("normal_exchange",amqp.ExchangeDirect,true,false,false,false,nil)utils.FailOnError(err, "Failed to declare a exchange")ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second)defer cancer()// 发送了10条消息for i := 0; i < 10; i++ {msg := "Info:" + strconv.Itoa(i)ch.PublishWithContext(ctx,"normal_exchange","normal_key",false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(msg),})}
}
3.3 针对原因2:限制一定的长度
只需要改变consumer1.go中的对normal_queue的声明
// 声明一个normal队列_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{//"x-message-ttl": 5000, // 指定过期时间"x-max-length": 6,"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key})
3.3 针对原因3:消费者拒绝的消息回到死信队列中
这里需要完成两点工作
工作1:需要在consumer1中作出拒绝的操作
go func() {for d := range msgs {if err := d.Reject(false); err != nil {utils.FailOnError(err, "Failed to Reject a message")}}}()
工作2:如果你consume的时候开启了自动应答一定要关闭
// 消费消息msgs, err := ch.Consume(constant.NormalQueue,"",false, // 这个地方一定要关闭自动应答false,false,false,nil)
其他的部分不需要改变,按照问题1中的设计即可。