Springboot整合RokectMQ
Springboot整合RokectMQ
1.开发环境搭建
1.创建Springboot工程引入相关依赖
<!--引入RocketMQ依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
2.修改配置文件
rocketmq:
#指定nameserver地址,如果是集群多个服务器之间用分号隔开
name-server: 192.168.79.203:9876;192.168.79.204:9876
producer:
#指定消费者组名
group: llp
2.发送普通消息
1.生产者
ProducerController.java
@RestController
public class ProducerController {
//模板,帮助我们去获取连接;感情自然流露:redisTemplate kafkaTemplate rabbitTemplate jdbcTemplate
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/convertAndSend")
public String convertAndSend() {
User user = new User("llp", "110");
rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
//rocketmq-spring-boot-starter依赖中包含了fastJson
return JSON.toJSONString(user);
}
}
User.java
@AllArgsConstructor
@NoArgsConstructor
@Data
//在实际开发中生产者和消费者在不通的服务器上,数据的传输需要通过远程调用的方式,自然就涉及到对象的序列化问题了
public class User implements Serializable {
private static final long serialVersionUID = 4894770668175892723L;
String userName;
String userId;
}
2.消费者
/**
* 1.通过实现RocketMQListener接口标识这个类是一个消费者
* public interface RocketMQListener<T> {
* void onMessage(T message);
* }
* 2.RocketMQListener接口可以指定泛型,比如我们生产者发送的是user对象,则可以指定泛型为User
* 当然指定了泛型,onMessage方法参数类型也是明确的(User类型)
* 3.除了标识这是一个消费者外,我们还需补充一些参数,比如:
* consumerGroup 消费者组
* topic 主题
* messageModel 消息模式 BROADCASTING("BROADCASTING"),广播 CLUSTERING("CLUSTERING"); 均发
* consumeThreadMax 最大消费线程数
* selectorType 选择消息类型
* selectorExpression 选择消息表达式
* 4.如果RocketMQListener没有指定泛型则onmessage接收object对象,需要自行处理
* 当然如果指定了泛型也就限定了消费者所能够接收的消息类型
*
*/
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic")
public class ConsumerListener implements RocketMQListener<User> {
/**
* 接收消息
* @param message 消息对象
*/
@Override
public void onMessage(User message) {
System.out.println(message);
}
}
3.运行测试
3.发同步消息
1.生产者
@RequestMapping("/syncSend")
public String syncSend() {
User user = new User("llp", "110");
/**
* 从下面这段代码可以看到,底层会根据冒号进行拆分
* 第一个元素作为Topic
* 如果拆分后的数组长度大于1则第二个元素作为Tag
*
*String[] tempArr = destination.split(":", 2);
* String topic = tempArr[0];
* String tags = "";
* if (tempArr.length > 1) {
* tags = tempArr[1];
* }
*/
rocketMQTemplate.syncSend("syncSendTopic:syncSendTag", user);
return JSON.toJSONString(user);
}
2.消费者
/**
* SelectorType selectorType() default SelectorType.TAG;
* String selectorExpression()default "*";
*/
@Component
@RocketMQMessageListener(consumerGroup = "llp", topic = "syncSendTopic", selectorType = SelectorType.TAG, selectorExpression = "syncSendTag")
//@RocketMQMessageListener(consumerGroup = "llp", topic = "syncSendTopic")
public class SyncConsumerListener implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.println(message);
}
}
4.发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
1.生产者
/**
* 发送异步消息
* @return
*/
public String asyncSend() {
User user = new User("llp", "110");
rocketMQTemplate.asyncSend("asyncTopic", user, new SendCallback() {
//发送成功回调方法
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功,发送结果:" + sendResult);
}
//发送失败回调方法
@Override
public void onException(Throwable e) {
System.out.println("发送失败:" + e.getMessage());
}
});
return JSON.toJSONString(user);
}
2.消费者
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "asyncTopic")
public class AsyncConsumerListener implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.println("消费者接收消息:"+message);
}
}
3.运行测试
发送成功,发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801025F0818B4AAC2382C3BB10008, offsetMsgId=C0A84FCC00002A9F000000000003F5F8, messageQueue=MessageQueue [topic=asyncTopic, brokerName=broker-b, queueId=1], queueOffset=0]
消费者接收消息:User(userName=llp, userId=110)
5.发送单向消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
1.生产者
@RequestMapping("/sendOneWay")
public String sendOneWay() {
User user = new User("llp", "110");
rocketMQTemplate.sendOneWay("sendOneWayTopic",user);
return JSON.toJSONString(user);
}
2.消费者
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "sendOneWayTopic")
public class SendOneWayConsumerListener implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.println("消费者接收消息:"+message);
}
}
6.两种消费消息模式
1.负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。这也是rocketmq默认的消费模式
1.生产者
@RequestMapping("/convertAndSend")
public String convertAndSend() {
User user = new User("llp", "110");
rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
//rocketmq-spring-boot-starter依赖中包含了fastJson
return JSON.toJSONString(user);
}
2.消费者1
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic")
public class LoadBalanceConsumerListener implements RocketMQListener<User> {
/**
* 接收消息
* @param message 消息对象
*/
@Override
public void onMessage(User message) {
System.out.println("消费者接收消息:"+message);
}
}
3.消费者2
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.CLUSTERING)
public class LoadBalanceConsumerListener2 implements RocketMQListener<User> {
/**
* 接收消息
* @param message 消息对象
*/
@Override
public void onMessage(User message) {
System.out.println("消费者接收消息:"+message);
}
}
4.测试结果
2.广播模式
1.生产者
@RequestMapping("/convertAndSend")
public String convertAndSend() {
User user = new User("llp", "110");
rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
//rocketmq-spring-boot-starter依赖中包含了fastJson
return JSON.toJSONString(user);
}
2.消费者1
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.BROADCASTING)
public class SubscribeConsumerListener implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.println("消费者1接收消息:"+message);
}
}
3.消费者2
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.BROADCASTING)
public class SubscribeConsumerListener2 implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.println("消费者2接收消息:"+message);
}
}
7.延时消息
1.生产者
@RequestMapping("/delaySend")
public String delaySend() {
User user = new User("llp", "110");
//延迟等级,这里2级对应延迟5秒
// org/apache/rocketmq/store/config/MessageStoreConfig.java
//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
rocketMQTemplate.syncSend("delaySendTopic", MessageBuilder.withPayload(user).build(), 2000, 2);
return JSON.toJSONString(user);
}
2.消费者
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "delaySendTopic")
public class DelayConsumerListener implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.println("消费者接收消息:"+message);
}
}
8.顺序消息
1.先看一个问题:消息错乱
原因
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的
queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。
但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,
则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分
区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消
息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
1.生产者
@RequestMapping("/orderSend")
public void orderSend() {
List<OrderStep> orderSteps = OrderStep.buildOrders();
for (OrderStep orderStep : orderSteps) {
//SendResult sendResult = rocketMQTemplate.syncSendOrderly("orderSendTopic", orderStep, String.valueOf(2000));
Message<OrderStep> message = MessageBuilder.withPayload(orderStep).build();
SendResult sendResult = rocketMQTemplate.syncSendOrderly("orderSendTopic", message, String.valueOf(orderStep.getOrderId()));
System.out.println("发送结果:" + sendResult);
}
}
/**
* 订单的步骤
*/
@Data
public class OrderStep {
private long orderId;
private String desc;
/**
* 生成模拟订单数据
*/
public static List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
2.消费者
@Component
@RocketMQMessageListener(consumerGroup = "llp2",topic = "orderSendTopic")
public class OrderConsumerListener implements RocketMQListener {
@Override
public void onMessage(Object message) {
System.out.println("消费者接收到消息:"+message.toString());
}
}
3.测试结果
消费者接收到消息:{orderId=15103111039, desc=创建}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC238866135002A, offsetMsgId=C0A84FCB00002A9F000000000007BD60, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=147]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886613C002E, offsetMsgId=C0A84FCB00002A9F000000000007BE65, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=63]
消费者接收到消息:{orderId=15103111065, desc=创建}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886613E0030, offsetMsgId=C0A84FCB00002A9F000000000007BF6A, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=148]
消费者接收到消息:{orderId=15103111039, desc=付款}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661410035, offsetMsgId=C0A84FCB00002A9F000000000007C06F, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=149]
消费者接收到消息:{orderId=15103117235, desc=创建}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661430038, offsetMsgId=C0A84FCB00002A9F000000000007C174, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=64]
消费者接收到消息:{orderId=15103111065, desc=付款}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC238866145003C, offsetMsgId=C0A84FCB00002A9F000000000007C279, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=150]
消费者接收到消息:{orderId=15103117235, desc=付款}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661470040, offsetMsgId=C0A84FCB00002A9F000000000007C37E, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=65]
消费者接收到消息:{orderId=15103111065, desc=完成}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614A0044, offsetMsgId=C0A84FCB00002A9F000000000007C483, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=151]
消费者接收到消息:{orderId=15103111039, desc=推送}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614D0048, offsetMsgId=C0A84FCB00002A9F000000000007C588, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=152]
消费者接收到消息:{orderId=15103117235, desc=完成}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614F004C, offsetMsgId=C0A84FCB00002A9F000000000007C68D, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=153]
消费者接收到消息:{orderId=15103111039, desc=完成}
9.事务消息
- 正常事务过程
- 事务补偿过程
事务消息状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
- 回滚状态:不允许进入队列,此消息等同于未发送过
- 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
- 注意:事务消息仅与生产者有关,与消费者无关
1.生产者
application.yml
transaction:
group: llp-transaction
生产者代码
@Value(value = "${transaction.group}")
private String transactionGroup;
@RequestMapping("/sendTransactionMsg")
public void sendTransactionMsg() {
User user = new User("llp", "110");
Message<User> message = MessageBuilder.withPayload(user).build();
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(transactionGroup, "transactionTopic", message, null);
System.out.println("发送事务消息,发送结果:"+sendResult);
}
事务消息监听
/**
* 事务消息Listener
*
*/
@RocketMQTransactionListener(txProducerGroup = "llp-transaction")
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
/**
* 执行本地事务
* 如果本地事务返回UNKNOWN,会进行事务补偿,自动执行下面的checkLocalTransaction方法
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务=====");
System.out.println(msg.getPayload());
//模拟提交事务
//return RocketMQLocalTransactionState.COMMIT;
//模拟回滚事务
//return RocketMQLocalTransactionState.ROLLBACK;
//让去check本地事务状态 进行事务补偿
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 检测本地事务状态
* 事务补偿过程
* 当消息服务器没有收到消息生产者的事务提交或者回滚确认时,会主动要求消息生产者进行确认,
* 消息生产者便会去检测本地事务状态,该过程称为事务补偿过程
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("执行事务补偿======");
//事务补偿提交
return RocketMQLocalTransactionState.COMMIT;
//事务补偿回滚
//return RocketMQLocalTransactionState.ROLLBACK;
//如果事务补偿过程还是UNKNOWN 就会一直进行事务补偿,60s一次
//return RocketMQLocalTransactionState.UNKNOWN;
}
}
2.消费者
@Component
@RocketMQMessageListener(consumerGroup = "llp-transaction",topic = "transactionTopic")
public class TransactionConsumerListener implements RocketMQListener {
@Override
public void onMessage(Object message) {
System.out.println("消费者接收到消息:"+message);
}
}
3.测试结果
执行本地事务=====
[B@4174e858
发送事务消息,发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80102506818B4AAC2389ADEBA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionTopic, brokerName=broker-b, queueId=2], queueOffset=2]
执行事务补偿======
消费者接收到消息:{userName=llp, userId=110}
10.批量消息
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
List<Message> msgList = new ArrayList<>();
msgList.add(new Message("topic6", "tag1", "msg1".getBytes()));
msgList.add(new Message("topic6", "tag1", "msg2".getBytes()));
msgList.add(new Message("topic6", "tag1", "msg3".getBytes()));
rocketMQTemplate.syncSend("topic8",msgList,1000);
# 发送时间超时时间
rocketmq.producer.send-message-timeout=300000
#异步消息发送失败重试次数
rocketmq.producer.retry-times-when-send-async-failed=0
#消息发送失败后的最大重试次数
rocketmq.producer.retry-times-when-send-failed=2
#消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
rocketmq.producer.compress-message-body-threshold=4096
#消息最大容量
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-next-server=true
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
rocketMQTemplate.syncSend("topic8",listItem,1000);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}