Springboot整合RokectMQ

  |   0 评论   |   0 浏览

Springboot整合RokectMQ

1.开发环境搭建

1.创建Springboot工程引入相关依赖

<!--引入RocketMQ依赖-->
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.0.3</version>
</dependency>

2.修改配置文件

rocketmq:
  #指定nameserver地址,如果是集群多个服务器之间用分号隔开
  name-server: 192.168.79.203:9876;192.168.79.204:9876
  producer:
    #指定消费者组名
    group: llp

2.发送普通消息

1.生产者

ProducerController.java

@RestController
public class ProducerController {
    //模板,帮助我们去获取连接;感情自然流露:redisTemplate kafkaTemplate rabbitTemplate  jdbcTemplate
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @RequestMapping("/convertAndSend")
    public String convertAndSend() {
        User user = new User("llp", "110");
        rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
        //rocketmq-spring-boot-starter依赖中包含了fastJson
        return JSON.toJSONString(user);
    }
}

User.java

@AllArgsConstructor
@NoArgsConstructor
@Data
//在实际开发中生产者和消费者在不通的服务器上,数据的传输需要通过远程调用的方式,自然就涉及到对象的序列化问题了
public class User implements Serializable {
    private static final long serialVersionUID = 4894770668175892723L;
    String userName;
    String userId;
}

2.消费者

/**
 * 1.通过实现RocketMQListener接口标识这个类是一个消费者
 * public interface RocketMQListener<T> {
 *     void onMessage(T message);
 * }
 * 2.RocketMQListener接口可以指定泛型,比如我们生产者发送的是user对象,则可以指定泛型为User
 * 当然指定了泛型,onMessage方法参数类型也是明确的(User类型)
 * 3.除了标识这是一个消费者外,我们还需补充一些参数,比如:
 * consumerGroup 消费者组
 * topic 主题
 * messageModel 消息模式 BROADCASTING("BROADCASTING"),广播 CLUSTERING("CLUSTERING"); 均发
 * consumeThreadMax 最大消费线程数
 * selectorType 选择消息类型
 * selectorExpression 选择消息表达式
 * 4.如果RocketMQListener没有指定泛型则onmessage接收object对象,需要自行处理
 * 当然如果指定了泛型也就限定了消费者所能够接收的消息类型
 *
 */
@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic")
public class ConsumerListener implements RocketMQListener<User> {

    /**
     * 接收消息
     * @param message 消息对象
     */
    @Override
    public void onMessage(User message) {
        System.out.println(message);
    }
}

3.运行测试

image-20220911192102107

3.发同步消息

1.生产者

@RequestMapping("/syncSend")
    public String syncSend() {
        User user = new User("llp", "110");
        /**
         * 从下面这段代码可以看到,底层会根据冒号进行拆分
         * 第一个元素作为Topic
         * 如果拆分后的数组长度大于1则第二个元素作为Tag
         *
         *String[] tempArr = destination.split(":", 2);
         *  String topic = tempArr[0];
         *  String tags = "";
         *  if (tempArr.length > 1) {
         *      tags = tempArr[1];
         *  }
         */
        rocketMQTemplate.syncSend("syncSendTopic:syncSendTag", user);
        return JSON.toJSONString(user);
    }

2.消费者

/**
 * SelectorType selectorType() default SelectorType.TAG;
 * String selectorExpression()default "*";
 */
@Component
@RocketMQMessageListener(consumerGroup = "llp", topic = "syncSendTopic", selectorType = SelectorType.TAG, selectorExpression = "syncSendTag")
//@RocketMQMessageListener(consumerGroup = "llp", topic = "syncSendTopic")
public class SyncConsumerListener implements RocketMQListener<User> {

    @Override
    public void onMessage(User message) {
        System.out.println(message);
    }
}

4.发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

1.生产者

/**
 * 发送异步消息
 * @return
 */
public String asyncSend() {
    User user = new User("llp", "110");

    rocketMQTemplate.asyncSend("asyncTopic", user, new SendCallback() {
        //发送成功回调方法
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功,发送结果:" + sendResult);
        }
        //发送失败回调方法
        @Override
        public void onException(Throwable e) {
            System.out.println("发送失败:" + e.getMessage());
        }
    });
    return JSON.toJSONString(user);
}

2.消费者

@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "asyncTopic")
public class AsyncConsumerListener implements RocketMQListener<User> {
    @Override
    public void onMessage(User message) {
        System.out.println("消费者接收消息:"+message);
    }
}

3.运行测试

发送成功,发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801025F0818B4AAC2382C3BB10008, offsetMsgId=C0A84FCC00002A9F000000000003F5F8, messageQueue=MessageQueue [topic=asyncTopic, brokerName=broker-b, queueId=1], queueOffset=0]
消费者接收消息:User(userName=llp, userId=110)

5.发送单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

1.生产者

@RequestMapping("/sendOneWay")
public String sendOneWay() {
    User user = new User("llp", "110");
    rocketMQTemplate.sendOneWay("sendOneWayTopic",user);
    return JSON.toJSONString(user);
}

2.消费者

@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "sendOneWayTopic")
public class SendOneWayConsumerListener implements RocketMQListener<User> {

    @Override
    public void onMessage(User message) {
        System.out.println("消费者接收消息:"+message);
    }
}

6.两种消费消息模式

1.负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。这也是rocketmq默认的消费模式

1.生产者

@RequestMapping("/convertAndSend")
public String convertAndSend() {
    User user = new User("llp", "110");
    rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
    //rocketmq-spring-boot-starter依赖中包含了fastJson
    return JSON.toJSONString(user);
}

2.消费者1

@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic")
public class LoadBalanceConsumerListener implements RocketMQListener<User> {

    /**
     * 接收消息
     * @param message 消息对象
     */
    @Override
    public void onMessage(User message) {
        System.out.println("消费者接收消息:"+message);
    }
}

3.消费者2

@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.CLUSTERING)
public class LoadBalanceConsumerListener2 implements RocketMQListener<User> {

    /**
     * 接收消息
     * @param message 消息对象
     */
    @Override
    public void onMessage(User message) {
        System.out.println("消费者接收消息:"+message);
    }
}

4.测试结果

image-20220911220119354

2.广播模式

1.生产者

@RequestMapping("/convertAndSend")
public String convertAndSend() {
    User user = new User("llp", "110");
    rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
    //rocketmq-spring-boot-starter依赖中包含了fastJson
    return JSON.toJSONString(user);
}

2.消费者1

@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.BROADCASTING)
public class SubscribeConsumerListener implements RocketMQListener<User> {
    @Override
    public void onMessage(User message) {
        System.out.println("消费者1接收消息:"+message);
    }
}

3.消费者2

@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.BROADCASTING)
public class SubscribeConsumerListener2 implements RocketMQListener<User> {
    @Override
    public void onMessage(User message) {
        System.out.println("消费者2接收消息:"+message);
    }
}

7.延时消息

1.生产者

@RequestMapping("/delaySend")
public String delaySend() {
    User user = new User("llp", "110");
    //延迟等级,这里2级对应延迟5秒
    // org/apache/rocketmq/store/config/MessageStoreConfig.java
    //private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    rocketMQTemplate.syncSend("delaySendTopic", MessageBuilder.withPayload(user).build(), 2000, 2);
    return JSON.toJSONString(user);
}

2.消费者

@Component
@RocketMQMessageListener(consumerGroup = "llp",topic = "delaySendTopic")
public class DelayConsumerListener implements RocketMQListener<User> {
    @Override
    public void onMessage(User message) {
        System.out.println("消费者接收消息:"+message);
    }
}

8.顺序消息

1.先看一个问题:消息错乱

原因

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的

queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,

则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分

区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消

息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

image-20220911224918007

image-20220911225029164

1.生产者

@RequestMapping("/orderSend")
public void orderSend() {
    List<OrderStep> orderSteps = OrderStep.buildOrders();
    for (OrderStep orderStep : orderSteps) {
        //SendResult sendResult = rocketMQTemplate.syncSendOrderly("orderSendTopic", orderStep, String.valueOf(2000));
        Message<OrderStep> message = MessageBuilder.withPayload(orderStep).build();
        SendResult sendResult = rocketMQTemplate.syncSendOrderly("orderSendTopic", message, String.valueOf(orderStep.getOrderId()));
        System.out.println("发送结果:" + sendResult);
    }
}
/**
 * 订单的步骤
 */
@Data
public class OrderStep {
    private long orderId;
    private String desc;

    /**
     * 生成模拟订单数据
     */
    public static List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

2.消费者

@Component
@RocketMQMessageListener(consumerGroup = "llp2",topic = "orderSendTopic")
public class OrderConsumerListener implements RocketMQListener {
    @Override
    public void onMessage(Object message) {
        System.out.println("消费者接收到消息:"+message.toString());
    }
}

3.测试结果

消费者接收到消息:{orderId=15103111039, desc=创建}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC238866135002A, offsetMsgId=C0A84FCB00002A9F000000000007BD60, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=147]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886613C002E, offsetMsgId=C0A84FCB00002A9F000000000007BE65, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=63]
消费者接收到消息:{orderId=15103111065, desc=创建}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886613E0030, offsetMsgId=C0A84FCB00002A9F000000000007BF6A, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=148]
消费者接收到消息:{orderId=15103111039, desc=付款}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661410035, offsetMsgId=C0A84FCB00002A9F000000000007C06F, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=149]
消费者接收到消息:{orderId=15103117235, desc=创建}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661430038, offsetMsgId=C0A84FCB00002A9F000000000007C174, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=64]
消费者接收到消息:{orderId=15103111065, desc=付款}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC238866145003C, offsetMsgId=C0A84FCB00002A9F000000000007C279, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=150]
消费者接收到消息:{orderId=15103117235, desc=付款}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661470040, offsetMsgId=C0A84FCB00002A9F000000000007C37E, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=65]
消费者接收到消息:{orderId=15103111065, desc=完成}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614A0044, offsetMsgId=C0A84FCB00002A9F000000000007C483, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=151]
消费者接收到消息:{orderId=15103111039, desc=推送}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614D0048, offsetMsgId=C0A84FCB00002A9F000000000007C588, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=152]
消费者接收到消息:{orderId=15103117235, desc=完成}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614F004C, offsetMsgId=C0A84FCB00002A9F000000000007C68D, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=153]
消费者接收到消息:{orderId=15103111039, desc=完成}

9.事务消息

  1. 正常事务过程
  2. 事务补偿过程

image-20220911232840280

事务消息状态

  1. 提交状态:允许进入队列,此消息与非事务消息无区别
  2. 回滚状态:不允许进入队列,此消息等同于未发送过
  3. 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
  4. 注意:事务消息仅与生产者有关,与消费者无关

1.生产者

application.yml

transaction:
  group: llp-transaction

生产者代码

@Value(value = "${transaction.group}")
private String transactionGroup;

@RequestMapping("/sendTransactionMsg")
public void sendTransactionMsg() {
    User user = new User("llp", "110");
    Message<User> message = MessageBuilder.withPayload(user).build();
    TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(transactionGroup, "transactionTopic", message, null);
    System.out.println("发送事务消息,发送结果:"+sendResult);
}

事务消息监听

/**
 * 事务消息Listener
 *
 */
@RocketMQTransactionListener(txProducerGroup = "llp-transaction")
public class TransactionMsgListener implements RocketMQLocalTransactionListener {

    /**
     * 执行本地事务
     * 如果本地事务返回UNKNOWN,会进行事务补偿,自动执行下面的checkLocalTransaction方法
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("执行本地事务=====");
        System.out.println(msg.getPayload());
        //模拟提交事务
        //return RocketMQLocalTransactionState.COMMIT;
        //模拟回滚事务
        //return RocketMQLocalTransactionState.ROLLBACK;
        //让去check本地事务状态 进行事务补偿
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    /**
     * 检测本地事务状态
     * 事务补偿过程
     * 当消息服务器没有收到消息生产者的事务提交或者回滚确认时,会主动要求消息生产者进行确认,
     * 消息生产者便会去检测本地事务状态,该过程称为事务补偿过程
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        System.out.println("执行事务补偿======");
        //事务补偿提交
        return RocketMQLocalTransactionState.COMMIT;
        //事务补偿回滚
        //return RocketMQLocalTransactionState.ROLLBACK;
        //如果事务补偿过程还是UNKNOWN 就会一直进行事务补偿,60s一次
        //return RocketMQLocalTransactionState.UNKNOWN;
    }
}

2.消费者

@Component
@RocketMQMessageListener(consumerGroup = "llp-transaction",topic = "transactionTopic")
public class TransactionConsumerListener implements RocketMQListener {
    @Override
    public void onMessage(Object message) {
        System.out.println("消费者接收到消息:"+message);
    }
}

3.测试结果

执行本地事务=====
[B@4174e858
发送事务消息,发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80102506818B4AAC2389ADEBA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionTopic, brokerName=broker-b, queueId=2], queueOffset=2]
执行事务补偿======
消费者接收到消息:{userName=llp, userId=110}

10.批量消息

如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

List<Message> msgList = new ArrayList<>(); 
msgList.add(new Message("topic6", "tag1", "msg1".getBytes())); 
msgList.add(new Message("topic6", "tag1", "msg2".getBytes()));
msgList.add(new Message("topic6", "tag1", "msg3".getBytes()));
rocketMQTemplate.syncSend("topic8",msgList,1000);
# 发送时间超时时间
rocketmq.producer.send-message-timeout=300000
#异步消息发送失败重试次数
rocketmq.producer.retry-times-when-send-async-failed=0
#消息发送失败后的最大重试次数
rocketmq.producer.retry-times-when-send-failed=2
#消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
rocketmq.producer.compress-message-body-threshold=4096
#消息最大容量
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-next-server=true

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
   	@Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
    rocketMQTemplate.syncSend("topic8",listItem,1000);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

标题:Springboot整合RokectMQ
作者:llp
地址:https://llinp.cn/articles/2022/09/12/1662912425700.html