RabbitMQ保证消息可靠性
RabbitMQ保证消息可靠性
生产者完整代码,消费者没有什么特殊处理
package com.llp.rabbitmq.java_api.confirms;
import com.llp.rabbitmq.java_api.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
* RabbitMQ保证消息可靠性
*/
public class Publisher {
public static final String QUEUE_NAME = "confirms";
@Test
public void publish()throws Exception{
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2. 构建Channel
Channel channel = connection.createChannel();
//3. 构建队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
String message = "Hello World!";
//4. 开启confirms
channel.confirmSelect();
//5. 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
//消息成功的发送到Exchange执行handleAck方法
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功的发送到Exchange!");
}
//消息没有发送到Exchange执行handleNack方法
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");
}
});
//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");
}
});
//7. 设置消息持久化
//注意在声明queue是指定durable=true是让队列持久化,并没有对消息进行持久化
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.deliveryMode(2)
.build();
//8. 发布消息
channel.basicPublish("",QUEUE_NAME,true,props,message.getBytes());
System.out.println("消息发送成功!");
System.in.read();
}
}
1.保证消息一定送达到Exchange
Confirm机制
可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果
//4. 开启confirms
channel.confirmSelect();
//5. 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功的发送到Exchange!");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");
}
});
2.保证消息可以路由到Queue
Return机制
为了保证Exchange上的消息一定可以送达到Queue
//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");
}
});
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
3.保证Queue可以持久化消息
DeliveryMode设置消息持久化
DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。
//7. 设置消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.deliveryMode(2)
.build();
//7. 发布消息
channel.basicPublish("","confirms",true,props,message.getBytes());
4.保证消费者可以正常消费消息
详情看WorkQueue模式
5 SpringBoot实现上述操作
5.1 Confirm
-
编写配置文件开启Confirm机制
spring: rabbitmq: publisher-confirm-type: correlated # 新版本 publisher-confirms: true # 老版本
-
在发送消息时,配置RabbitTemplate
@Test public void publishWithConfirms() throws IOException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息已经送达到交换机!!"); }else{ System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!"); } } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message"); System.out.println("消息发送成功"); System.in.read(); }
5.2 Return
-
编写配置文件开启Return机制
spring: rabbitmq: publisher-returns: true # 开启Return机制
-
在发送消息时,配置RabbitTemplate
@Test public void publishWithReturn() throws IOException { // 新版本用 setReturnsCallback ,老版本用setReturnCallback rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { String msg = new String(returned.getMessage().getBody()); System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!"); } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message"); System.out.println("消息发送成功"); System.in.read(); }
5.3 消息持久化
@Test
public void publishWithBasicProperties() throws IOException {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息的持久化!
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
System.out.println("消息发送成功");
}