RabbitMQ死信队列&延迟交换机

  |   0 评论   |   0 浏览

RabbitMQ死信队列&延迟交换机

1.什么是死信

死信&死信队列
1644476424544.png

死信队列的应用:

  • 基于死信队列在队列消息已满的情况下,消息也不会丢失
  • 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间

2. 实现死信队列

2.1 准备Exchange&Queue
package com.llp.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 死信队列配置
 */
@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE = "normal-exchange";
    public static final String NORMAL_QUEUE = "normal-queue";
    public static final String NORMAL_ROUTING_KEY = "normal.#";

    public static final String DEAD_EXCHANGE = "dead-exchange";
    public static final String DEAD_QUEUE = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead.#";

    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
    }

    @Bean
    public Queue normalQueue(){
        //普通队列,绑定死信队列
        return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();
    }

    @Bean
    public Binding normalBinding(Queue normalQueue,Exchange normalExchange){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    @Bean
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
}
2.2 实现效果
  • 基于消费者进行reject或者nack实现死信效果

    package com.llp.rabbitmq.topic;
    
    import com.llp.rabbitmq.config.DeadLetterConfig;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class DeadListener {
    
        @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
        public void consume(String msg, Channel channel, Message message) throws IOException {
            System.out.println("接收到normal队列的消息:" + msg);
            //设置消息决绝消费,不需要重新放入到队列中
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            //或者
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
    }
    
  • 消息的生存时间

    • 给消息设置生存时间

      @Test
      public void publishExpire(){
          String msg = "dead letter expire";
          rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                  message.getMessageProperties().setExpiration("5000");
                  return message;
              }
          });
      }
      
    • 给队列设置消息的生存时间

      @Bean
      public Queue normalQueue(){
          return QueueBuilder.durable(NORMAL_QUEUE)
                  .deadLetterExchange(DEAD_EXCHANGE)
                  .deadLetterRoutingKey("dead.abc")
                  .ttl(10000)
                  .build();
      }
      
  • 设置Queue中的消息最大长度

    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(NORMAL_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead.abc")
                .maxLength(1)
                .build();
    }
    

    只要Queue中已经有一个消息,如果再次发送一个消息,这个消息会变为死信!

3.延迟交换机

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

image-20230501154020297

将下载的文件上传到linux服务器并使用如下指令,将文件方到rabbitmq容器的plugins目录下

docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 9da57c5038ba:/opt/rabbitmq/plugins

image-20230501154420697

在rabbitmq容器的/opt/rabbitmq/sbin目录下执行

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启容器生效

docker restart 9da57c5038ba

可以看到添加插件后多了一个延迟交换机的选项

image-20230501154955764

  • 构建延迟交换机

    package com.llp.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 延迟队列
     */
    @Configuration
    public class DelayedConfig {
    
        public static final String DELAYED_EXCHANGE = "delayed-exchange";
        public static final String DELAYED_QUEUE = "delayed-queue";
        public static final String DELAYED_ROUTING_KEY = "delayed.#";
    
        @Bean
        public Exchange delayedExchange(){
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type","topic");
            Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
            return exchange;
        }
    
        @Bean
        public Queue delayedQueue(){
            return QueueBuilder.durable(DELAYED_QUEUE).build();
        }
    
        @Bean
        public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){
            return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    }
    
  • 发送消息

    package com.llp.rabbitmq;
    
    import com.llp.rabbitmq.config.DelayedConfig;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    public class DelayedPublisherTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void publish(){
            rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //设置消息指定多少时间被消费,单位毫秒
                    message.getMessageProperties().setDelay(30000);
                    return message;
                }
            });
        }
    }
    

**延迟交换机存在的问题,**在延迟推送消息的过程中rabbitmq重启了、或者说服务器宕机了就会导致消息丢失


标题:RabbitMQ死信队列&延迟交换机
作者:llp
地址:https://llinp.cn/articles/2023/05/01/1682928476261.html