系列六、Springboot操作RocketMQ

news/2024/7/7 19:40:20

一、同步消息

1.1、发送&接收简单消息

1.1.1、发送简单消息

/**
 * 测试发送简单消息
 */
@Test
public void sendSimpleMessage() {
	SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_SIMPLE", "我是一个简单消息");
	// 往[BOOT_TOPIC_SIMPLE]主题发送一个简单消息
	log.info("测试发送简单消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:39:18.296  INFO 14700 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送简单消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_SIMPLE"},"msgId":"7F000001396C18B4AAC230D9778C0000","offsetMsgId":"C0A8B58A00002A9F0000000000029FA6","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

1.1.2、接收简单消息

/**
 * @Description: 消费者消息消息,就添加一个监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_SIMPLE",consumerGroup = "BOOT_TOPIC_SIMPLE_GROUP",messageModel = MessageModel.CLUSTERING)
public class MySimpleMessageListener implements RocketMQListener<MessageExt> {

    /**
     * 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。
     * 如何判断消息是否接收?
     *      没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试
     * @param messageExt
     */
    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("接收简单消息message:{}",new String(messageExt.getBody()));
    }
}
// 控制台打印结果
2023-08-10 11:39:18.293  INFO 6044 --- [_SIMPLE_GROUP_2] o.star.listener.MySimpleMessageListener  : 接收简单消息message:我是一个简单消息

1.2、发送&接收对象消息

1.2.1、发送对象消息

/**
 * 测试发送对象消息
 */
@Test
public void sendObjectMessage() {
	Order order = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");
	// 往BOOT_TOPIC_OBJ主题发送一个订单对象
	SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_OBJ", order);
	log.info("测试发送对象消息result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:42:57.879  INFO 35812 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送对象消息result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_OBJ"},"msgId":"7F0000018BE418B4AAC230DCD14D0000","offsetMsgId":"C0A8B58A00002A9F000000000002A0C2","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

1.2.2、接收对象消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_OBJ", consumerGroup = "BOOT_TOPIC_OBJ_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyObjectMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("接收对象消息message:{}",new String(messageExt.getBody()));
    }
}

// 控制台打印结果
2023-08-10 13:44:35.458  INFO 35816 --- [PIC_OBJ_GROUP_1] o.star.listener.MyObjectMessageListener  : 接收对象消息message:{"orderSn":"f5c39c2e86f74649b9582e5e50c500ff","userId":1,"description":"小米2s,为发烧而生"}

1.3、发送&接收集合消息

1.3.1、发送集合消息

/**
 * 测试发送集合消息
 */
@Test
public void sendCollectionMessage() {
	Order order1 = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");
	Order order2 = new Order(UUID.randomUUID().toString().replace("-", ""), 2, "小米3s,为发烧而生,你值得拥有");
	List<Order> orders = Arrays.asList(order1, order2);
	// 往[BOOT_TOPIC_COLLECTION]主题发送集合对象
	SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_COLLECTION", orders);
	log.info("测试发送集合消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 13:50:25.053  INFO 28696 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送集合消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_COLLECTION"},"msgId":"7F000001701818B4AAC2315181130000","offsetMsgId":"C0A8B58A00002A9F000000000002A21F","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

1.3.2、接收集合消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_COLLECTION", consumerGroup = "BOOT_TOPIC_COLLECTION_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyCollectionMessageListener implements RocketMQListener<MessageExt> {

    /**
     * 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。
     * 如何判断消息是否接收?
     *      没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试
     * @param messageExt
     */
    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("接收集合消息message:{}",new String(messageExt.getBody()));
    }
}
// 控制台打印结果
2023-08-10 13:50:39.726  INFO 30076 --- [LECTION_GROUP_1] o.s.l.MyCollectionMessageListener        : 接收集合消息message:[{"orderSn":"141bb7c6535b472d83a5099a43422d04","userId":1,"description":"小米2s,为发烧而生"},{"orderSn":"d2b41a75e087455e8910c1cba84f830f","userId":2,"description":"小米3s,为发烧而生,你值得拥有"}]

二、异步消息

发送&接收异步消息

发送异步消息

/**
 * 测试发送异步消息
 *
 * @throws Exception
 */
@Test
public void sendASyncSimpleMessage() throws Exception {
	rocketMQTemplate.asyncSend("BOOT_TOPIC_ASYNC", "我是一个异步消息", new SendCallback() {
		@Override
		public void onSuccess(SendResult result) {
			log.info("测试发送异步消息 result:{}", JSON.toJSONString(result));
		}

		@Override
		public void onException(Throwable throwable) {
			log.info("测试发送异步消息 error:{}", throwable.getMessage());
		}
	});
	log.info("我先执行");

	// 挂起JVM不让方法结束
	System.in.read();
}
// 控制台打印结果
2023-08-10 14:02:21.125  INFO 30988 --- [           main] cketmqSpringbootProducerApplicationTests : 我先执行
2023-08-10 14:02:21.793  INFO 30988 --- [ublicExecutor_1] cketmqSpringbootProducerApplicationTests : 测试发送异步消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"BOOT_TOPIC_ASYNC"},"msgId":"7F000001790C18B4AAC2315C70D50000","offsetMsgId":"C0A8B58A00002A9F000000000002A3FC","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

接收异步消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ASYNC",consumerGroup = "BOOT_TOPIC_ASYNC_GROUP",messageModel = MessageModel.CLUSTERING)
public class MyASyncMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("接收异步消息 message:{}",new String(messageExt.getBody()));
    }
}
// 控制台打印结果
2023-08-10 14:02:36.334  INFO 10676 --- [C_ASYNC_GROUP_1] o.star.listener.MyASyncMessageListener   : 接收异步消息 message:我是一个异步消息

三、单向消息

发送&接收单向消息

发送单向消息

/**
 * 适用场景:适用于不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送
 */
@Test
public void sendOnewayMessage() {
	// 发送单向消息,没有返回值和结果
	rocketMQTemplate.sendOneWay("BOOT_TOPIC_ONE_WAY", "我是一个单向消息");
}

接收单向消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ONE_WAY", consumerGroup = "BOOT_TOPIC_ONE_WAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyOnewayMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("接收单向消息 message:{}",new String(messageExt.getBody()));
    }

}
// 控制台打印结果
2023-08-10 14:07:23.965  INFO 32740 --- [ONE_WAY_GROUP_1] o.star.listener.MyOnewayMessageListener  : 接收单向消息 message:我是一个单向消息

四、延迟消息

发送&接收延迟消息

发送延迟消息

/**
 * 测试发送延迟消息
 */
@Test
public void sendDelayMessage() {
	Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();
	/**
	 * 设定消息的延迟等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
	 *      1s: 等级1
	 *      5s: 等级2
	 *      10s:等级3
	 *      30s:等级4
	 *      1m: 等级5
	 * 发送一个延迟消息,延迟等级为4级,也就是30s后被监听消费
	 * 注意事项:RocketMQ不支持任意时间的延时,只支持上述的延迟规则
	 */
	SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_DELAY", message, 2000, 4);
	log.info("测试发送延迟消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:13:03.859  INFO 4804 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送延迟消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_DELAY"},"msgId":"7F00000112C418B4AAC231663CE60000","offsetMsgId":"C0A8B58A00002A9F000000000002A634","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

接收延迟消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_DELAY", consumerGroup = "BOOT_TOPIC_DELAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyDelayMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("接收延迟消息 message:{}",new String(messageExt.getBody()));
    }

}

// 控制台打印结果
2023-08-10 14:13:33.860  INFO 26112 --- [C_DELAY_GROUP_1] o.star.listener.MyDelayMessageListener   : 接收延迟消息 message:我是一个延迟消息

五、顺序消息

发送&接收顺序消息

发送顺序消息

/**
 * 发送顺序消息,控制流程:下订单==》发短信==》物流
 * 测试发送顺序消息
 */
@Test
public void sendOrderlyMessage() {
	// 顺序消息,发送者将一组消息都发送至同一个队列,消费者需要单线程进行消费
	List<Order> orders = Arrays.asList(
			new Order("aaa", 1, "下订单"),
			new Order("aaa", 1, "发短信"),
			new Order("aaa", 1, "物流"),

			new Order("bbb", 2, "下订单"),
			new Order("bbb", 2, "发短信"),
			new Order("bbb", 2, "物流")
	);
	orders.forEach(order -> {
		// 发送,一般都是以json的方式进行处理
		SendResult result = rocketMQTemplate.syncSendOrderly("BOOT_TOPIC_ORDERLY", JSON.toJSONString(order), order.getOrderSn());
		log.info("订单id:{},队列id:{},结果:{}",order.getUserId(),result.getMessageQueue().getQueueId(),result.getSendStatus());
	});
}
// 控制台打印结果
2023-08-10 14:23:02.023  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.026  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.028  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.029  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.030  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.031  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK

接收顺序消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ORDERLY",
                         consumerGroup = "BOOT_TOPIC_ORDERLY_GROUP",
                         consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式,单线程
                         maxReconsumeTimes = 5              // 消费重试的次数
)
public class MyOrderlyMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        Order order = JSON.parseObject(new String(messageExt.getBody()),Order.class);
        log.info("接收顺序消息order:{}",order);
    }
}
// 控制台打印结果
2023-08-10 14:23:46.714  INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=下订单)
2023-08-10 14:23:46.714  INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=下订单)
2023-08-10 14:23:46.901  INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=发短信)
2023-08-10 14:23:46.901  INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=物流)
2023-08-10 14:23:46.901  INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=发短信)
2023-08-10 14:23:46.901  INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=物流)

六、带Tag的消息

发送&接收带Tag的消息

发送带Tag消息

/**
 * 测试发送带Tag的消息
 */
@Test
public void sendTagMessage() {
	SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_TAG:TagA", "我是一个带Tag的消息");
	log.info("测试发送带Tag的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:34:07.806  INFO 30388 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送带Tag的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_TAG"},"msgId":"7F00000176B418B4AAC2317986310000","offsetMsgId":"C0A8B58A00002A9F000000000002B029","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

接收带Tag消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_TAG",
        consumerGroup = "BOOT_TOPIC_TAG_GROUP",
        messageModel = MessageModel.CLUSTERING,
        selectorType = SelectorType.TAG,    // tag过滤模式
        selectorExpression = "TagA || TagB"
)
public class MyTagMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("接收带Tag标签的消息 result:{}", new String(messageExt.getBody()));
    }
}
// 控制台打印结果
2023-08-10 14:34:27.260  INFO 10868 --- [PIC_TAG_GROUP_1] org.star.listener.MyTagMessageListener   : 接收带Tag标签的消息 result:我是一个带Tag的消息

七、带Key的消息

发送&接收带Key的消息

发送带Key的消息

/**
 * 测试发送带Key的消息
 */
@Test
public void sendKeyMessage() {
	// key写在消息头里边
	Message<String> message = MessageBuilder.withPayload("我是一个带Key的消息").setHeader(RocketMQHeaders.KEYS, "STAR").build();
	SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_KEY", message);
	log.info("测试发送带Key的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:45:42.963  INFO 26148 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送带Key的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_KEY"},"msgId":"7F000001662418B4AAC2318421A80000","offsetMsgId":"C0A8B58A00002A9F000000000002B275","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

接收带Key的消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_KEY",
        consumerGroup = "BOOT_TOPIC_KEY_GROUP",
        messageModel = MessageModel.CLUSTERING
)
public class MyKeyMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("接收带Key的消息:{},Key:{}",new String(messageExt.getBody()), messageExt.getKeys());
    }
}
// 控制台打印结果
2023-08-10 14:45:42.963  INFO 23040 --- [PIC_KEY_GROUP_1] org.star.listener.MyKeyMessageListener   : 接收带Key的消息:我是一个带Key的消息,Key:STAR

八、集群模式的消息

发送&接收集群模式的消息

发送集群消息

/**
 * 测试消息消费模式-集群模式
 * 此种方式消费者会采取轮询的方式进行消费
 */
@Test
public void modeForClusterSendMessage() {
	for (int i = 1; i <= 10; i++) {
		SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_CLUSTER", "我是第" + i + "个消息");
		log.info("集群模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());
	}
}

// 控制台打印结果
2023-08-10 14:58:14.203  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.207  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.211  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.213  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.216  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.218  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.220  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.222  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.224  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.227  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK

接收集群消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:{}",new String(messageExt.getBody()));
    }
}

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:{}",new String(messageExt.getBody()));
    }
}

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:{}",new String(messageExt.getBody()));
    }
}

// 控制台打印结果
2023-08-10 14:58:14.209  INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第1个消息
2023-08-10 14:58:14.212  INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener2   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第2个消息
2023-08-10 14:58:14.212  INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener1   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第3个消息
2023-08-10 14:58:14.216  INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第4个消息
2023-08-10 14:58:14.217  INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第5个消息
2023-08-10 14:58:14.220  INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener2   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第6个消息
2023-08-10 14:58:14.222  INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener1   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第7个消息
2023-08-10 14:58:14.223  INFO 7300 --- [CLUSTER_GROUP_4] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第8个消息
2023-08-10 14:58:14.226  INFO 7300 --- [CLUSTER_GROUP_5] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第9个消息
2023-08-10 14:58:14.229  INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener2   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第10个消息

九、广播模式的消息

发送&接收广播模式的消息

发送广播模式的消息

/**
 * 测试消息消费模式-广播模式
 * 此种方式每一个消费者都会消费一次消息
 */
@Test
public void modeForBroadcastingSendMessage() {
	for (int i = 1; i <= 5; i++) {
		SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_BROADCASTING", "我是第" + i + "个消息");
		log.info("广播模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());
	}
}
// 控制台打印结果
2023-08-10 15:12:10.081  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK
2023-08-10 15:12:10.084  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:1,结果:SEND_OK
2023-08-10 15:12:10.086  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:2,结果:SEND_OK
2023-08-10 15:12:10.088  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:3,结果:SEND_OK
2023-08-10 15:12:10.090  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK

接收广播模式的消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener1 implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());
    }
}

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener2 implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());
    }
}

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener3 implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());
    }
}

// 控制台打印结果
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.092  INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092  INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092  INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第5个消息,队列id:0


http://lihuaxi.xjx100.cn/news/1433785.html

相关文章

【深度学习】再谈向量化

前言 向量化是一种思想&#xff0c;不仅体现在可以将任意实体用向量来表示&#xff0c;更为突出的表现了人工智能的发展脉络。向量的演进过程其实都是人工智能向前发展的时代缩影。 1.为什么人工智能需要向量化 电脑如何理解一门语言&#xff1f;电脑的底层是二进制也就是0和1&…

图解java.util.concurrent并发包源码系列——深入理解ReentrantReadWriteLock读写锁,看完可以吊打面试官

图解java.util.concurrent并发包源码系列——深入理解ReentrantReadWriteLock读写锁&#xff0c;看完可以吊打面试官 ReentrantReadWriteLock的作用ReentrantReadWriteLock的原理ReentrantReadWriteLock源码解析构造方法获取写锁和读锁对象计算读锁被持有数和写锁被持有数的位移…

网页爬虫中常用代理IP主要有哪几种?

各位爬虫探索者&#xff0c;你是否有想过在网页爬虫中使用代理IP来规避限制实现数据自由&#xff1f;在这篇文章中&#xff0c;作为一名IP代理产品供应商&#xff0c;我将为你揭示常见的网页爬虫代理IP类型&#xff0c;让你在爬虫的世界中游刃有余&#xff01; 一、免费公开代理…

umi快速搭建中后台管理系统(命令行创建和区块开发的区别)

后台管理系统 命令行创建新项目命令行效果图配置文件package.json umi3构建区块readme.md主页面页面布局页面布局的一些参数总结 区块开发步骤三级目录 命令行创建新项目 命令行 1、 npm i ant-design/pro-cli -g 2、 pro create demo_spacemv_managementsys 3、 umi 3 simpl…

从一道面试题来学习前台进程和后台进程、孤儿进程和僵尸进程

1、面试题介绍 以前面试&#xff0c;面试官问了一个问题&#xff0c;大意是&#xff1a; 我们在终端中&#xff0c;通过执行 python main.py 命令&#xff0c;会启动一台前台进程直到程序结束。现在我还是想通过执行 python main.py &#xff0c;启动一个后台进程&#xff0c;…

Java【Spring】使用注解, 更简单的存储和获取 Bean

文章目录 前言一、存储 Bean1, 配置文件2, 五大类注解Bean 的命名规则 3, 方法注解Bean 的命名规则 二、获取 Bean1, 属性注入2, Setter 注入3, 构造方法注入4, Autowired 和 Resource 的区别5, 同一个类型的多个 Bean 注入问题 总结 前言 各位读者好, 我是小陈, 这是我的个人主…

innovus 报告多边形floorplan的boundary坐标

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球 历史文章: 常用dbGet命令 dbGet快速入门 使用"Cut Rectilinear"功能可以​调整floorplan形状&#xff0c;使其变成非矩形多边形&#xff08;polygon&#xff09;。​ …

MySQL_DQL语句(查询语句以及常用函数)

基础查询 不带条件的查询查询多个字段 语法&#xff1a; #查询指定字段的数据 SELECT 字段1, 字段2, 字段3 ... FROM 表名 ; #查询表中全部字段的数据 SELECT * FROM 表名 ;案例&#xff1a;查询表中所有信息数据 SELECT * FROM employee;案例&#xff1a;查询表中姓名和性别…