RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ

news/2024/7/7 19:24:49

文章目录

    • 添加 RocketMQ 依赖
    • 消费者 Consumer
      • YAML 配置
      • 创建监听器
      • 消息过滤
        • Tag 过滤
    • 生产者 Producer
      • YAML 配置
      • 发送同步消息
      • 发送异步消息
      • 发送单向消息
      • 发送延迟消息
      • 发送顺序消息
      • 发送批量消息
      • 发送集合消息

添加 RocketMQ 依赖

  1. 在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:

    image-20230527214713414

  2. 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:

    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
    

消费者 Consumer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:
  name-server: 192.168.68.121:9876     # rocketMq的nameServer地址

创建监听器

创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:@Component@RocketMQMessageListener,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:

@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"消息内容:"+msg);
    }

}

@RocketMQMessageListener 注解参数如下:

参数描述
topic消费者订阅的主题
consumerGroup消费者组
consumeMode消费模式:并发接收消息 | 有序接收消息【ConsumeMode.CONCURRENTLY or ConsumeMode.ORDERLY
messageModel消息模式:集群模式 | 广播模式【MessageModel.CLUSTERING or MessageModel.BROADCASTING
selectorType过滤消息的方式:Tag | SQL92【SelectorType.TAG or SelectorType.SQL92
selectorExpression过滤消息的表达式:Tag | SQL92【`tag1
maxReconsumeTimes消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。
delayLevelWhenNextConsume并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组)

消息过滤

Tag 过滤

消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

编写并启动消费者项目订阅 tagTopic 主题:

@Component
@RocketMQMessageListener(topic = "tagTopic",
        consumerGroup = "boot-mq-group-consumer",
        selectorType = SelectorType.TAG,
        selectorExpression = "java")
public class MQMsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送一个带 Tag 的同步消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/tag")
    public String sendSyncMessage() {
        SendResult result = rocketMQTemplate.syncSend("tagTopic:java", "这是一个带有 java tag 的消息");
        return "发送状态:" + result.getSendStatus() + "<br>消息id:" + result.getMsgId();
    }
    
}

运行项目,访问接口:http://localhost:8080/send/tag

image-20230528191958989

查看 RocketMQ 控制台,可以看到消息带有 java tag:

image-20230528191938535

查看消费者项目的 IDEA 控制台:

image-20230528191142421

生产者 Producer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:
  name-server: 192.168.68.121:9876     # rocketMq的nameServer地址
  producer:
    group: boot-mq-group-producer # 生产者组名

注:生产者需要标注生产者组名,否则会报异常:'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.

发送同步消息

编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/sync/{msg}")
    public String sendSyncMessage(@PathVariable String msg){
        SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);
        return "发送状态:"+result.getSendStatus()+"<br>消息id:"+result.getMsgId();
    }

}

运行项目,访问接口:http://localhost:8080/send/sync/同步消息

image-20230527231022909

访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:

image-20230527231142472

发送异步消息

不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。

编写 Controller,使用 RocketMQTemplate 的 asyncSend() 方法发送异步消息,并使用回调接口打印发送的结果:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/async/{msg}")
    public String sendAsyncMessage(@PathVariable String msg) {
        rocketMQTemplate.asyncSend("asyncTopic", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步消息发送成功");
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步消息发送失败");
            }
        });
        System.out.println("异步消息已发送完成");
        return "发送异步消息";
    }
  
}

运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:

image-20230527232838438

访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:

image-20230527233249499

发送单向消息

编写 Controller,使用 RocketMQTemplate 的 sendOneWay() 方法发送单向消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/oneWay/{msg}")
    public String sendOneWayMessage(@PathVariable String msg) {
        rocketMQTemplate.sendOneWay("oneWayTopic",msg);
        return "单向消息发送成功";
    }

}

运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息

image-20230527233640217

访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:

image-20230527233751658

发送延迟消息

编写并启动消费者项目订阅 delayTopic 主题:

@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+new Date());
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/delay/{msg}")
    public String sendDelayMessage(@PathVariable String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        // 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        SendResult result = rocketMQTemplate.syncSend("delayTopic", message, 2000, 3);
        return "发送状态:" + result.getSendStatus() + "<br>消息id:" + result.getMsgId()+"<br>消息发送时间:"+new Date();
    }

}

运行项目,访问接口:http://localhost:8080/send/delay/延迟消息

image-20230528141811562

查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。

image-20230528141834080

发送顺序消息

编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:

public class Order {
    //订单号
    private String orderId;
    //订单名称
    private String orderName;
    //订单的流程顺序
    private String seq;
}

编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:

@Component
@RocketMQMessageListener(topic = "orderlyTopic",
        consumerGroup="boot-mq-group-consumer",
        consumeMode = ConsumeMode.ORDERLY)
public class MQMsgListener implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order message) {
        System.out.println("消费者:"+message);
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSendOrderly() 方法发送同步顺序消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/orderly")
    public String sendOrderlyMessage() {
        List<Order> orders = Arrays.asList(
                new Order(UUID.randomUUID().toString(), "下订单", "1"),
                new Order(UUID.randomUUID().toString(), "发短信", "1"),
                new Order(UUID.randomUUID().toString(), "物流", "1"),
                new Order(UUID.randomUUID().toString(), "签收", "1"),

                new Order(UUID.randomUUID().toString(), "下订单", "2"),
                new Order(UUID.randomUUID().toString(), "发短信", "2"),
                new Order(UUID.randomUUID().toString(), "物流", "2"),
                new Order(UUID.randomUUID().toString(), "签收", "2")
        );
        //控制流程:下订单->发短信->物流->签收
        //将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费
        orders.forEach(order -> {
            rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());
        });
        return "发送成功";
    }

}

运行项目,访问接口:http:localhost:8080/send/orderly

image-20230528152807514

查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:

image-20230528152925141

查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:

image-20230528152848032

发送批量消息

编写并启动消费者项目订阅 batchOrderly 主题:

@Component
@RocketMQMessageListener(topic = "batchOrderly",
        consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order message) {
        System.out.println(Thread.currentThread().getName()+":"+message);
    }
  
}

编写生产者 Controller,将消息打包成 Collection<Message> msgs 传入 syncSend() 方法中发送:

@RestController
public class ProducerController {

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  @GetMapping("/send/batch")
  public String sendOrderlyMessage() {

    List<Message> messages = Arrays.asList(
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build()
    );
    return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();
    
  }

}

运行项目,访问接口:http:localhost:8080/send/batch

image-20230528161620859

查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:

image-20230528161706194

查看消费者项目的 IDEA 控制台,多个线程并发进行消费:

image-20230528161804943

发送集合消息

编写并启动消费者项目订阅 listTopic 主题:

@Component
@RocketMQMessageListener(topic = "listTopic",
        consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<List<Order>> {

    @Override
    public void onMessage(List<Order> orders) {
        orders.forEach(o -> {
            System.out.println(Thread.currentThread().getName()+":"+o);
        });
    }

}

编写生产者 Controller,将集合传入 syncSend() 方法中发送:

@RestController
public class ProducerController {

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  @GetMapping("/send/list")
  public String sendOrderlyMessage() {

    List<Order> orders = Arrays.asList(
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1")
    );
    rocketMQTemplate.syncSend("listTopic",orders);
    return "发送成功";
  }

}

运行项目,访问接口:http:localhost:8080/send/list

image-20230528161620859

查看 RocketMQ 控制台,可以看到队列中一条消息:

image-20230528163701846

查看消费者项目的 IDEA 控制台,进行消费:

image-20230528163745691


http://lihuaxi.xjx100.cn/news/1178894.html

相关文章

网络安全常用靶场推荐

sqli-labs sqli-labs包含了大多数的sql注入类型&#xff0c;以一种闯关模式&#xff0c;对于sql注入进行漏洞利用 下载地址&#xff1a;https://github.com/Audi-1/sqli-labs xss challenges xsschallenges是一个专对于XSS漏洞练习的在线靶场&#xff0c;包含了各种绕过&#…

【JVM】12. 垃圾回收相关概念

文章目录 12.1. System.gc()的理解12.2. 内存溢出与内存泄露内存溢出&#xff08;OOM&#xff09;内存泄漏&#xff08;Memory Leak&#xff09; 12.3. Stop The World12.4. 垃圾回收的并行与并发并发&#xff08;Concurrent&#xff09;并行&#xff08;Parallel&#xff09;并…

python常用知识梳理(必看篇)

下面小编就为大家带来一篇python常用知识梳理(必看篇)。小编觉得挺不错的&#xff0c;现在就分享给大家&#xff0c;也给大家做个参考。一起跟随小编过来看看吧 接触python已有一段时间了&#xff0c;下面针对python基础知识的使用做一完整梳理&#xff1a; 1&#xff09;避免…

[Nacos] Nacos Server处理订阅请求 (九)

文章目录 1.InstanceController#list()2.InstanceController#doSrvIpxt()3.总结 1.InstanceController#list() Nacos Server处理订阅请求 主要还是从请求中获取参数, 比如namespceId、serviceName、agent(指定提交请求的客户端是哪种类型)、clusters、clusterIP、udpPort(后续…

路径规划算法:基于入侵杂草优化的路径规划算法- 附代码

路径规划算法&#xff1a;基于入侵杂草优化的路径规划算法- 附代码 文章目录 路径规划算法&#xff1a;基于入侵杂草优化的路径规划算法- 附代码1.算法原理1.1 环境设定1.2 约束条件1.3 适应度函数 2.算法结果3.MATLAB代码4.参考文献 摘要&#xff1a;本文主要介绍利用智能优化…

9. Linux下实现简单的UDP请求

本文简单介绍了UDP传输层协议&#xff0c;并在Linux下实现简单的socket通讯 一、UDP UDP&#xff08;User Datagram Protocol&#xff0c;用户数据报协议&#xff09;是一种无连接的传输层协议&#xff0c;它不保证数据包的可靠性和顺序。UDP在IP协议的基础上增加了简单的差错…

LeetCode第347场周赛

2023.5.28LeetCode第347场周赛 A. 移除字符串中的尾随零 思路 从最后一位开始遍历&#xff0c;为0则跳过 代码 class Solution { public:string removeTrailingZeros(string num) {int i num.size() - 1;while (i > 0 && num[i] 0) i -- ;return num.substr(…

【JAVAWEB】HTML的常见标签

目录 1.HTML结构 1.1认识HTML标签 1.2HTML文件基本结构 1.3标签层次结构 1.4快速生成代码框架 2.HTML常见标签 注释标签 标题标签&#xff1a;h1-h6 段落标签:p 换行标签&#xff1a;br 格式化标签 图片标签 超链接标签&#xff1a;a 表格标签 列表标签 表单标…