RabbitMQ保证消息可靠性

  |   0 评论   |   0 浏览

RabbitMQ保证消息可靠性

生产者完整代码,消费者没有什么特殊处理

package com.llp.rabbitmq.java_api.confirms;

import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 * RabbitMQ保证消息可靠性
 */
public class Publisher {

    public static final String QUEUE_NAME = "confirms";
    @Test
    public void publish()throws Exception{
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);


        String message = "Hello World!";

        //4. 开启confirms
        channel.confirmSelect();

        //5. 设置confirms的异步回调
        channel.addConfirmListener(new ConfirmListener() {
            //消息成功的发送到Exchange执行handleAck方法
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功的发送到Exchange!");
            }

            //消息没有发送到Exchange执行handleNack方法
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");
            }
        });

        //6. 设置Return回调,确认消息是否路由到了Queue
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");
            }
        });

        //7. 设置消息持久化
        //注意在声明queue是指定durable=true是让队列持久化,并没有对消息进行持久化
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .deliveryMode(2)
                .build();

        //8. 发布消息
        channel.basicPublish("",QUEUE_NAME,true,props,message.getBytes());
        System.out.println("消息发送成功!");


        System.in.read();
    }

}

1.保证消息一定送达到Exchange

Confirm机制

可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果

//4. 开启confirms
channel.confirmSelect();

//5. 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息成功的发送到Exchange!");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");
    }
});

2.保证消息可以路由到Queue

Return机制

为了保证Exchange上的消息一定可以送达到Queue

//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");
    }
});
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调

3.保证Queue可以持久化消息

DeliveryMode设置消息持久化

DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。

//7. 设置消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties()
    .builder()
    .deliveryMode(2)
    .build();

//7. 发布消息
channel.basicPublish("","confirms",true,props,message.getBytes());

4.保证消费者可以正常消费消息

详情看WorkQueue模式

5 SpringBoot实现上述操作

5.1 Confirm
  • 编写配置文件开启Confirm机制

    spring:
      rabbitmq:
        publisher-confirm-type: correlated  # 新版本
        publisher-confirms: true  # 老版本
    
  • 在发送消息时,配置RabbitTemplate

    @Test
    public void publishWithConfirms() throws IOException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    System.out.println("消息已经送达到交换机!!");
                }else{
                    System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!");
                }
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");
    
        System.in.read();
    }
    
5.2 Return
  • 编写配置文件开启Return机制

    spring:
      rabbitmq:
        publisher-returns: true # 开启Return机制
    
  • 在发送消息时,配置RabbitTemplate

    @Test
    public void publishWithReturn() throws IOException {
        // 新版本用 setReturnsCallback ,老版本用setReturnCallback
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                String msg = new String(returned.getMessage().getBody());
                System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");
    
        System.in.read();
    }
    
5.3 消息持久化
@Test
public void publishWithBasicProperties() throws IOException {
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 设置消息的持久化!
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }
    });
    System.out.println("消息发送成功");
}

标题:RabbitMQ保证消息可靠性
作者:llp
地址:https://llinp.cn/articles/2023/05/01/1682924850486.html