RocketMQ-消息机制
RocketMQ-消息机制
- 导入MQ客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
- 消息发送者步骤分析
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
- 消息消费者步骤分析
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
1.基本样例
1. 消息发送
1)发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args)throws Exception{
//1.创建消费者Consumer,指定消费者组名group1
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址,集群多台用分号隔开
producer.setNamesrvAddr("192.168.79.203:9876;192.168.79.204:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 100; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数1:消息主题
* 参数2 tag
* 参数3:消息内容
*/
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送消息,通过sendResult返回消息是否成功送达
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:"+sendResult);
}
//6.关闭生产者producer, 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
发送结果示例:由于是双主双从的结果,broker主节点负载接收消息,从节点负责消费消息
//sendStatus发送状态、msgId:消息id、offsetMsgId:偏移量id、topic:主题、brokerName:broker节点名称,名字在配置文件中指定、queueId:队列id
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80102253018B4AAC2350CD7470000, offsetMsgId=C0A84FCC00002A9F000000000001F521, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=3], queueOffset=176]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80102253018B4AAC2350CD7D30001, offsetMsgId=C0A84FCB00002A9F000000000001F0F5, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=175]
- Message ID
消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列 MQ 系统自动生成,唯一标识某条消息。
- SendStatus
发送的标识。成功,失败等
- Queue
相当于是Topic的分区;用于并行发送和接收消息
2)发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
//由于这里集群是双主双从且,master和salve broker都在同一个机器上,性能有限异步发送可能会出现失败的情况
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80102539018B4AAC2351A1CFD0005, offsetMsgId=C0A84FCC00002A9F0000000000028CD4, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=2], queueOffset=235]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80102539018B4AAC2351A1CFD0003, offsetMsgId=C0A84FCC00002A9F0000000000028D82, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=232]
3)单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。cn.enjoyedu.normal.OnewayProducer
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 1.实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.设置NameServer的地址
producer.setNamesrvAddr("192.168.79.203:9876;192.168.79.204:9876");
// 3.启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 4.创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 5.发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 6.如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
4)消息发送的权衡
2. 消费消息
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的1条Q。
而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下,消费进度(ConsumerOffset)的存储会持久化到Broker。
单消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.79.203:9876;192.168.79.204:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("TopicTest", "TagA");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently(){
/**
* 接收消息内容的方法
* @param msgs
* @param context 消费者并发消费环境
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("msg:" +msg);
byte[] body = msg.getBody();
System.out.println("消息内容:"+new String(body));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
}
}
msg:MessageExt [queueId=2, storeSize=178, queueOffset=245, sysFlag=0, bornTimestamp=1662853378585, bornHost=/192.168.79.1:12683, storeTimestamp=1662853379519, storeHost=/192.168.79.203:10911, msgId=C0A84FCB00002A9F000000000002BC9D, commitLogOffset=179357, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=246, CONSUME_START_TIME=1662853378667, UNIQ_KEY=C0A801024AFC18B4AAC2352776180000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]
msg:MessageExt [queueId=0, storeSize=178, queueOffset=244, sysFlag=0, bornTimestamp=1662853378589, bornHost=/192.168.79.1:12684, storeTimestamp=1662853379524, storeHost=/192.168.79.204:10911, msgId=C0A84FCC00002A9F000000000002C584, commitLogOffset=181636, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=245, CONSUME_START_TIME=1662853378667, UNIQ_KEY=C0A801024AFC18B4AAC23527761D0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]
消息内容:Hello RocketMQ 2
消息内容:Hello RocketMQ 0
1)负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
生产者
public class SyncProducer {
public static void main(String[] args) throws Exception{
//实例化消息生产者,指定消费者组名group1
DefaultMQProducer producer = new DefaultMQProducer("group_test");
//设置nameserver的地址,集群多台用分号隔开
producer.setNamesrvAddr("127.0.0.1:9876");
// producer.setNamesrvAddr("192.168.79.203:9876;192.168.79.204:9876");
// producer.setSendLatencyFaultEnable(true);
//启动消息生产者
producer.start();
//创建消息并发送
for (int i = 1; i <= 10; i++){
Message msg = new Message("TopicTest", "TagA", ("这是第" + i + "条消息").getBytes());
SendResult result = producer.send(msg);
System.out.println(result);
}
producer.shutdown();
}
}
消费者1
public class BalanceComuser1 {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic
consumer.subscribe("TopicTest", "*");
// 负载均衡模式消费,(默认的消费模式,可以不指定)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
Thread.sleep(1000);
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费者2
public class BalanceComuser2 {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic
consumer.subscribe("TopicTest", "*");
// 负载均衡模式消费,(默认的消费模式,可以不指定)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
Thread.sleep(1000);
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
执行结果
消费者1:
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第1条消息
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第2条消息
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第5条消息
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第6条消息
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第9条消息
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第10条消息
消费者2:
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第4条消息
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第3条消息
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第8条消息
收到消息: topic :TopicTest ,tags : TagA ,msg : 这是第7条消息
2)广播模式
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
消费者1
public class BroadcastComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("C-king");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");// 106.55.246.66
// 订阅Topic
consumer.subscribe("TopicTest", "*");
// 广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费者2
public class BroadcastComuser2 {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("B-test");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");//106.55.246.66
// 订阅Topic
consumer.subscribe("TopicTest", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
测试结果:两个不同消费者组的消费者都会消费生产者推送的每一条消息,即使两个消费者属于同一个消费者组也是如此
消费者1
消费者2
3)消息消费时的权衡
集群模式:适用场景&注意事项
消费端集群化部署,每条消息只需要被处理一次。
由于消费进度在服务端维护,可靠性更高。
集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
广播模式:适用场景&注意事项
广播消费模式下不支持顺序消息。
广播消费模式下不支持重置消费位点。
每条消息都需要被相同逻辑的多台机器处理。
消费进度在客户端维护,出现重复的概率稍大于集群模式。
广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
目前仅 Java 客户端支持广播模式。
广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
2. 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
- 全局顺序消息
- 部分顺序消息
1.顺序消息生产
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。
使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue,consume消费消息失败时,不能返回reconsume——later,这样会导致乱序
应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
Producer
/**
* 部分顺序消息生产
*/
public class ProducerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 订单列表
List<Order> orderList = new ProducerInOrder().buildOrders();
for (int i = 0; i < orderList.size(); i++) {
String body = orderList.get(i).toString();
Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
//根据订单id选择发送queue,让订单id相同的消息分配到同一个消息队列(queue)
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 订单
*/
private static class Order {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据 3个订单 每个订单4个状态
* 每个订单 创建->付款->推送->完成
*/
private List<Order> buildOrders() {
List<Order> orderList = new ArrayList<Order>();
Order orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(002);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(003);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(002);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(003);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(002);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(003);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(002);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(003);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
2.顺序消费消息
消费时,同一个OrderId获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer2");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("PartOrder", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ",queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
测试结果,在消费者端每个线程负责一个订单流程
consumeThread=ConsumeMessageThread_3,queueId=2, content:Order{orderId=2, desc='创建'}
consumeThread=ConsumeMessageThread_1,queueId=3, content:Order{orderId=3, desc='创建'}
consumeThread=ConsumeMessageThread_2,queueId=1, content:Order{orderId=1, desc='创建'}
consumeThread=ConsumeMessageThread_1,queueId=3, content:Order{orderId=3, desc='付款'}
consumeThread=ConsumeMessageThread_1,queueId=3, content:Order{orderId=3, desc='推送'}
consumeThread=ConsumeMessageThread_2,queueId=1, content:Order{orderId=1, desc='付款'}
consumeThread=ConsumeMessageThread_1,queueId=3, content:Order{orderId=3, desc='完成'}
consumeThread=ConsumeMessageThread_3,queueId=2, content:Order{orderId=2, desc='付款'}
consumeThread=ConsumeMessageThread_3,queueId=2, content:Order{orderId=2, desc='推送'}
consumeThread=ConsumeMessageThread_2,queueId=1, content:Order{orderId=1, desc='推送'}
consumeThread=ConsumeMessageThread_3,queueId=2, content:Order{orderId=2, desc='完成'}
consumeThread=ConsumeMessageThread_2,queueId=1, content:Order{orderId=1, desc='完成'}
3.延时消息
- 概念
延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。
- 应用场景
消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
- 使用方式
Apache RocketMQ目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。(阿里云RocketMQ提供了任意时刻的定时消息功能,Apache的RocketMQ并没有,阿里并没有开源)
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息是根据延迟队列的level来的,延迟队列默认是msg.setDelayTimeLevel(3)代表延迟10秒
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
源码中:org/apache/rocketmq/store/config/MessageStoreConfig.java
是这18个等级(秒(s)、分(m)、小时(h)),level为1,表示延迟1秒后消费,level为5表示延迟1分钟后消费,level为18表示延迟2个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。
1.启动消息消费者
public class DelayConsumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2.指定Nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3.订阅主题Topic和Tag
consumer.subscribe("TopicTest", "*");
// 4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently(){
/**
* 接收消息内容的方法
* @param msgs
* @param context 消费者并发消费环境
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 消息存储时间( msg.getStoreTimestamp() ) - 消息发送时间( msg.getBornTimestamp() ) ==》 延迟时间(单位毫秒)
System.out.println("消息id:"+ msg.getMsgId()+",延迟时间:"+(msg.getStoreTimestamp()-msg.getBornTimestamp())/1000+"s");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者consumer
consumer.start();
}
}
2.发送延时消息
public class DelayProducer {
public static void main(String[] args) throws Exception{
// 1.实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3.启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 4.创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 延时等级参考 org/apache/rocketmq/store/config/MessageStoreConfig.java
// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
msg.setDelayTimeLevel(3);
// 5.发送单向消息,没有任何返回结果
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:"+sendResult);
}
// 6.如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
3.验证
消费者日志,将会看到消息存储时间比消息发送时间晚10秒
消息id:7F0000017D4018B4AAC27E3D87B80000,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C80009,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C70007,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C80008,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C00001,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C10002,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C50004,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C60005,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C40003,延迟时间:10s
消息id:7F0000017D4018B4AAC27E3D87C70006,延迟时间:10s
4.批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
- 批量切片
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//发送示例: 把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
1.发送批量消息
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
//批量发送消息
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
5.过滤消息
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
1.SQL基本语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
2.Tag过滤
1.消息生产者
public class TagProducer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,指定消费者组名group1
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址,集群多台用分号隔开
producer.setNamesrvAddr("192.168.79.203:9876;192.168.79.204:9876");
//3.启动producer
producer.start();
String tags[] = {"TagA","TagB","TagC"};
for (int i = 0; i < 3; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数1:消息主题
* 参数2 tag
* 参数3:消息内存
*/
Message msg = new Message("TagTopic" , tags[i], ("Hello RocketMQ " + i).getBytes());
//5.发送消息,通过sendResult返回消息是否成功送达
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:" + sendResult);
}
//6.关闭生产者producer, 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
2.消息消费者
public class TagConsumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.79.203:9876;192.168.79.204:9876");
//3.订阅主题Topic和Tag * 表示接收该topic所有tag消息; TagA || TagB 表示接收TagA和TagB的消息
consumer.subscribe("TagTopic", "TagA || TagB");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently(){
/**
* 接收消息内容的方法
* @param msgs
* @param context 消费者并发消费环境
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String tags = msg.getTags();
System.out.println("消息tag:"+tags);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动成功");
}
}
3.SQL过滤
ps:开启SQL过滤功能支持,需要修改broker.conf配置文件,加入enablePropertyFilter=true 然后重启Broker服务
1.消息生产者
发送消息时,你能通过putUserProperty
来设置消息的属性
public class SqlProducer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,指定消费者组名group1
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址,集群多台用分号隔开
producer.setNamesrvAddr("192.168.79.203:9876;192.168.79.204:9876");
//3.启动producer
producer.start();
String tags[] = {"TagA","TagB","TagC"};
for (int i = 0; i < 3; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数1:消息主题
* 参数2 tag
* 参数3:消息内存
*/
Message msg = new Message("SqlFilterTopic" , tags[i], ("Hello RocketMQ " + i).getBytes());
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
//5.发送消息,通过sendResult返回消息是否成功送达
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:" + sendResult);
}
//6.关闭生产者producer, 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
2.消息消费者
用MessageSelector.bySql来使用sql筛选消息
public class SqlConsumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.79.203:9876;192.168.79.204:9876");
//3.订阅主题Topic,只有订阅的消息有这个属性a, a >=0 and a <= 3
//这里根据sql进行过滤和tag没有关系
consumer.subscribe("SqlFilterTopic", MessageSelector.bySql("a between 0 and 3"));
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently(){
/**
* 接收消息内容的方法
* @param msgs
* @param context 消费者并发消费环境
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String tags = msg.getTags();
System.out.println("消息tag:"+tags);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动成功");
}
}
测试结果
消息tag:TagA
消息tag:TagB
消息tag:TagC
注意,如果要支持sql过滤需要在broker配置文件中开启sql过滤支持 #开启sql过滤支持 enablePropertyFilter=true
6.事务消息
1.流程分析
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
- 正常事务流程
(1) 发送消息(half消息):图中步骤1。
(2) 服务端响应消息写入结果:图中步骤2。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行):图中步骤3。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见):图中步骤4
- 事务补偿流程
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”:图中步骤5。
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态:图中步骤6。
(3) 根据本地事务状态,重新Commit或者Rollback::图中步骤6。
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
- 事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
TransactionStatus.CommitTransaction
: 提交状态,它允许消费者消费此消息(完成图中了1,2,3,4步,第4步是Commit)。
TransactionStatus.RollbackTransaction
: 回滚状态,它代表该消息将被删除,不允许被消费(完成图中了1,2,3,4步, 第4步是Rollback)。
TransactionStatus.Unknown
: 中间状态,它代表需要检查消息队列来确定状态(完成图中了1,2,3步, 但是没有4或者没有7,无法Commit或Rollback)。
2.使用场景
用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。如何保证数据的完整性?
可以使用RocketMQ的分布式事务保证在下单失败后系统数据的完整性
3.使用限制
1.事务消息不支持延时消息和批量消息。
2.事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过Broker的配置文件设置好。
3.为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。、
4.事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
5.事务性消息可能不止一次被检查或消费。
6.事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
7.提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
8.事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
补充:RocketMQ中的消息回查设置
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:
transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默
认为60秒
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔
4.发送事务消息
1) 创建事务性生产者
使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。
/**
* A系统
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
// 创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 设置生产者回查线程池
producer.setExecutorService(executorService);
// 生产者设置监听器
producer.setTransactionListener(transactionListener);
// 启动消息生产者
producer.start();
// 1、半事务的发送
try {
Message msg =
new Message("TransactionTopic", null, ("A向B系统转100块钱 ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
System.out.println(sendResult.getSendStatus() + "-" + df.format(new Date()));// 半事务消息是否成功
} catch (MQClientException | UnsupportedEncodingException e) {
// todo 回滚rollback
e.printStackTrace();
}
// 2、半事务的发送成功
// 一些长时间等待的业务(比如输入密码,确认等操作):需要通过事务回查来处理
for (int i = 0; i < 1000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
2)实现事务的监听接口
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务(步骤3)。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态(步骤5),并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
public class TransactionListenerImpl implements TransactionListener {
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
//todo 执行本地事务 update A...
System.out.println("update A ... where transactionId:"+msg.getTransactionId() +":"+df.format(new Date()));
//System.out.println("commit");
//todo 情况1:本地事务成功
//return LocalTransactionState.COMMIT_MESSAGE;
//todo 情况2:本地事务失败
//System.out.println("rollback");
//return LocalTransactionState.ROLLBACK_MESSAGE;
//todo 情况3:业务复杂,还处于中间过程或者依赖其他操作的返回结果,就是unknow
System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
return LocalTransactionState.UNKNOW;
}
//事务回查 RokectMQ默认是60s回查一次本地事务,即一分钟检查一次
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//打印每次回查的时间
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println("checkLocalTransaction:"+df.format(new Date()));// new Date()为获取当前系统时间
//todo 情况3.1:业务回查成功!
System.out.println("业务回查:执行本地事务成功,确认消息");
return LocalTransactionState.COMMIT_MESSAGE;
//todo 情况3.2:业务回查回滚!
// System.out.println("业务回查:执行本地事务失败,删除消息");
// return LocalTransactionState.ROLLBACK_MESSAGE;
//todo 情况3.3:业务回查还是UNKNOW!
//System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
//return LocalTransactionState.UNKNOW;
}
}
3)创建事务性消费者
/**
* 事务消息-消费者 B
*/
public class TranscationComuser {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TranscationComsuer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TransactionTopic", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
// todo 开启事务
for (MessageExt msg : msgs) {
// todo 执行本地事务 update B...(幂等性)
System.out.println("update B ... where transactionId:" + msg.getTransactionId());
// todo 本地事务成功
System.out.println("commit:" + msg.getTransactionId());
System.out.println("执行本地事务成功,确认消息");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("执行本地事务失败,重试消费,尽量确保B处理成功");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
4)测试验证
生产者日志:
//本地事务提交,消息发送到broker进行存储,此时消息是没有提交的,消费者并不能消费消息
update A ... where transactionId:7F000001735818B4AAC27EC158D70000:2024-01-25 14:43:20
业务比较长,还没有处理完,不知道是成功还是失败!
SEND_OK-2024-01-25 14:43:20
//默认60s进行一次回查,如果成功则提交本地事务,确认消息commit,之后消费者才能消费消息
checkLocalTransaction:2024-01-25 14:44:13
业务回查:执行本地事务成功,确认消息
消费者日志:
//生产者确认消息,之后消费者拉取消息进行消费
update B ... where transactionId:7F000001735818B4AAC27EC158D70000
commit:7F000001735818B4AAC27EC158D70000
执行本地事务成功,确认消息
RocketMQ事务消息也可以部署多个生产者,当一个生产推送了事务消息挂掉之后,另一个生产者还可以对事务进行回查
生产者2部署在别的服务器
/**
* A系统
*/
public class TransactionProducer2 {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl2();
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
// 创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 设置生产者回查线程池
producer.setExecutorService(executorService);
// 生产者设置监听器
producer.setTransactionListener(transactionListener);
// 启动消息生产者
producer.start();
// 1、半事务的发送
try {
Message msg =
new Message("TransactionTopic", null, ("A向B系统转100块钱 ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println(sendResult.getSendStatus());// 半事务消息是否成功
} catch (MQClientException | UnsupportedEncodingException e) {
// todo 回滚rollback
e.printStackTrace();
}
// 2、半事务的发送成功
// 一些长时间等待的业务(比如输入密码,确认等操作):需要通过事务回查来处理
for (int i = 0; i < 1000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
生产者2的本地事务监听
public class TransactionListenerImpl2 implements TransactionListener {
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
//todo 执行本地事务 update A...
System.out.println("update A ... where transactionId:"+msg.getTransactionId() +":"+df.format(new Date()));
System.out.println("commit");
//todo 情况1:本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
//todo 情况2:本地事务失败
//return LocalTransactionState.ROLLBACK_MESSAGE;
//todo 情况3:业务复杂,还处于中间过程或者依赖其他操作的返回结果,就是unknow
//System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
//return LocalTransactionState.UNKNOW;
}
//事务回查 默认是60s,一分钟检查一次
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//打印每次回查的时间
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println("checkLocalTransaction:"+df.format(new Date()));// new Date()为获取当前系统时间
//todo 情况3.1:业务回查成功!
System.out.println("业务回查:执行本地事务成功,确认消息");
return LocalTransactionState.COMMIT_MESSAGE;
//todo 情况3.2:业务回查回滚!
//System.out.println("业务回查:执行本地事务失败,删除消息");
//return LocalTransactionState.ROLLBACK_MESSAGE;
//todo 情况3.3:业务回查还是UNKNOW!
// System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
// return LocalTransactionState.UNKNOW;
}
}
RocketMQ的事务消息主要集中在生产者,提供了回查机制可以有效的解决生产者端堵塞等待的问题,相较于kafka的事务消息要灵活许多,在回查时如果本地判断本地事务是否执行成功如果执行成功则将消息推送给消费者进行消费,消费者端执行自己的业务并开启本地事务来进行回滚或者提交,此外消费者端还需要确保消息幂等性以及消费失败重试等问题
7.生产者|消费者重要属性、方法
1.生产者
/**
* 消息发送时的细节
*/
public class ProducerDetails {
public static void main(String[] args) throws Exception{
//todo producerGroup:生产者所属组(针对 事务消息 高可用)
DefaultMQProducer producer = new DefaultMQProducer("produce_details");
//todo 默认主题在每一个Broker队列数量(对于新创建主题有效)
producer.setDefaultTopicQueueNums(8);
//todo 发送消息默认超时时间,默认3s (3000ms)
producer.setSendMsgTimeout(1000*3);
//todo 消息体超过该值则启用压缩,默认4k
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
//todo 同步方式发送消息重试次数,默认为2,总共执行3次
producer.setRetryTimesWhenSendFailed(2);
//todo 异步方式发送消息重试次数,默认为2,总共执行3次
producer.setRetryTimesWhenSendAsyncFailed(2);
//todo 消息重试时选择另外一个Broker时(消息没有存储成功是否发送到另外一个broker),默认为false
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
//todo 允许发送的最大消息长度,默认为4M
producer.setMaxMessageSize(1024 * 1024 * 4);
// 设置NameServer的地址
producer.setNamesrvAddr("106.55.246.66:9876");//106.55.246.66
// 启动Producer实例
producer.start();
//todo 0 查找该主题下所有消息队列
List<MessageQueue> MessageQueue = producer.fetchPublishMessageQueues("TopicTest");
for (int i = 0; i < MessageQueue.size(); i++) {
System.out.println(MessageQueue.get(i).getQueueId());
}
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//todo 单向发送
//todo 1.1发送单向消息
producer.sendOneway(msg);
//todo 1.2指定队列单向发送消息(使用select方法)
producer.sendOneway(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
},null);
//todo 1.3指定队列单向发送消息(根据之前查找出来的主题)
producer.sendOneway(msg,MessageQueue.get(0));
//todo 同步发送
//todo 2.1同步发送消息
SendResult sendResult0 = producer.send(msg);
//todo 2.1同步超时发送消息(属性设置:sendMsgTimeout 发送消息默认超时时间,默认3s (3000ms) )
SendResult sendResult1 = producer.send(msg,1000*3);
//todo 2.2指定队列同步发送消息(使用select方法)
SendResult sendResult2 = producer.send(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
},null);
//todo 2.3指定队列同步发送消息(根据之前查找出来的主题队列信息)
SendResult sendResult3 = producer.send(msg,MessageQueue.get(0));
//todo 异步发送
//todo 3.1异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
//todo 3.1异步超时发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
},1000*3);
//todo 3.2选择指定队列异步发送消息(根据之前查找出来的主题队列信息)
producer.send(msg,MessageQueue.get(0),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
//todo 3.3选择指定队列异步发送消息(使用select方法)
producer.send(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
},null,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
}
Thread.sleep(10000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
- 属性
producerGroup:生产者所属组
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M
- 方法
//启动
void start() throws MQClientException;
//关闭
void shutdown();
//查找该主题下所有消息队列
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
//发送单向消息
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
//选择指定队列单向发送消息
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, InterruptedException;
//同步发送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
//同步超时发送消息
SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//选择指定队列同步发送消息
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//异步发送消息
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
//异步超时发送消息
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
//选择指定队列异步发送消息
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;
2.消费者
/**
* 消息消费时的细节
*/
public class ComuserDetails {
public static void main(String[] args) throws Exception {
//todo 属性
//todo consumerGroup:消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("king");
//todo 指定Namesrv地址信息.
consumer.setNamesrvAddr("106.55.246.66:9876");
//todo 消息消费模式(默认集群消费)
consumer.setMessageModel(MessageModel.CLUSTERING);
//todo 指定消费开始偏移量(上次消费偏移量、最大偏移量、最小偏移量、启动时间戳)开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//todo 消费者最小线程数量(默认20)
consumer.setConsumeThreadMin(20);
//todo 消费者最大线程数量(默认20)
consumer.setConsumeThreadMax(20);
//todo 推模式下任务间隔时间(推模式也是基于不断的轮训拉取的封装)
consumer.setPullInterval(0);
//todo 推模式下任务拉取的条数,默认32条(一批批拉)
consumer.setPullBatchSize(32);
//todo 消息重试次数,-1代表16次 (超过 次数成为死信消息)
consumer.setMaxReconsumeTimes(-1);
//todo 消息消费超时时间(消息可能阻塞正在使用的线程的最大时间:以分钟为单位)
consumer.setConsumeTimeout(15);
//todo 获取消费者对主题分配了那些消息队列
Set<MessageQueue> MessageQueueSet = consumer.fetchSubscribeMessageQueues("TopicTest");
Iterator iterator = MessageQueueSet.iterator();
while(iterator.hasNext()){
MessageQueue MessageQueue =(MessageQueue)iterator.next();
System.out.println(MessageQueue.getQueueId());
}
//todo 方法-订阅
//todo 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//todo 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest",MessageSelector.bySql("a between 0 and 3"));
//todo 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest",MessageSelector.byTag("tagA|TagB"));
//todo 取消消息订阅
consumer.unsubscribe("TopicTest");
//todo 注册监听器
//todo 注册并发事件监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
//没有成功 -- 到重试队列中来
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//todo
}
});
//todo 注册顺序消息事件监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//todo 这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- 属性
//消费者组
private String consumerGroup;
//消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;
//指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
ConsumeFromTimestamp模式下只会在订阅组(消费者群组)第一次启动的时候,过滤掉小于当前系统时间戳的消息,后续如果进程停掉或者崩溃,但是又生产了新消息。下次启动消费者时,会继续消费停掉期间新生产的消息。后续行为和ConsumeFromLastOffset类似
//消费者最小线程数量
private int consumeThreadMin = 20;
//消费者最大线程数量
private int consumeThreadMax = 20;
//推模式下任务间隔时间
private long pullInterval = 0;
//推模式下任务拉取的条数,默认32条
private int pullBatchSize = 32;
//消息重试次数,-1代表16次
private int maxReconsumeTimes = -1;
//消息消费超时时间
private long consumeTimeout = 15;
- 方法
void subscribe(final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器
void unsubscribe(final String topic):取消消息订阅
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列
void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器
void registerMessageListener(final MessageListenerOrderly messageListener):注册顺序消息事件监听器
3.消息确认(ACK)
业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。
返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。
如果业务的回调没有处理好而抛出异常,会认为是消费失败ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。
另外如果使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费
- 死信队列
/**
* 当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
* 死信队列是死信Topic下分区数唯一的单独队列
* 死信Topic名称为%DLQ%原消费者组名,死信队列的消息将不会再被消费
*
* 消息处理失败处理方式:
* 监听死信队列处理消息
* 1.直接记录到数据库中
* 2.重试几次在记录到数据库中
*/
public class DeadLeterComumer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic
consumer.subscribe("%DLQ%TopicTest", "*");
// 负载均衡模式消费,(默认的消费模式,可以不指定)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
Thread.sleep(1000);
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}