RocketMQ 的两种消息消费模式:Pull(拉取)和Push(推送)

news/2024/7/5 3:05:06

RocketMQ 支持两种消息消费模式:Pull(拉取)和Push(推送),它们之间有一些区别和联系。下面是它们的主要特点和比较:

Pull(拉取)模式:

  1. 主动权在消费者: 在Pull模式中,消费者主动向Broker请求消息,决定拉取的时机和拉取的数量。消费者有更大的控制权,可以按照自己的需求灵活地拉取消息。
  2. 消息拉取频率: 消费者可以根据实际情况决定拉取消息的频率,例如可以定时拉取,也可以根据业务负载动态调整拉取消息的速度。
  3. 流量控制: Pull模式下,消费者可以根据自身情况进行流量控制,避免瞬时大量消息涌入导致负载过重。
  4. 适用场景: Pull模式适用于需要更多消费者控制和自适应的场景,例如消费者需要根据实时负载情况自主调整拉取消息的速度。

Pull(拉取)模式示例代码:

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.message.MessageQueue;

public class PullConsumerDemo {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("your_consumer_group");
        consumer.setNamesrvAddr("your_namesrv_address");

        // 启动消费者
        consumer.start();

        // 订阅主题
        consumer.subscribe("YourTopic", "*");

        // 从指定队列拉取消息
        MessageQueue messageQueue = new MessageQueue("YourTopic", "your_broker_name", 0);
        PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, "*", getMessageQueueOffset(messageQueue), 32);

        // 处理拉取的消息
        if (pullResult.getPullStatus() == PullStatus.FOUND) {
            pullResult.getMsgFoundList().forEach(msg -> {
                // 处理消息
                System.out.println(new String(msg.getBody()));
            });
        }

        // 更新队列偏移量
        updateMessageQueueOffset(messageQueue, pullResult.getNextBeginOffset());

        // 关闭消费者
        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        // TODO: 从存储中获取当前队列的偏移量
        return 0;
    }

    private static void updateMessageQueueOffset(MessageQueue mq, long offset) {
        // TODO: 将当前队列的偏移量更新到存储中
    }
}

Push(推送)模式:

  1. 被动接收消息: 在Push模式中,消息是由Broker推送给消费者的,消费者不再需要主动去请求消息,而是等待消息到达。
  2. 实时性: Push模式下,消息可以实时被推送给消费者,适用于需要实时响应的场景。
  3. 消息推送速率: Push模式下,消息的推送速率由Broker控制,可能会受到一些限制。如果消息推送速率过快,消费者可能需要自行处理流控。
  4. 适用场景: Push模式适用于对实时性要求较高、消费者希望被动接收消息的场景。

Push(推送)模式示例代码:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class PushConsumerDemo {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
        consumer.setNamesrvAddr("your_namesrv_address");

        // 设置消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                // 处理消息
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 订阅主题和标签
        consumer.subscribe("YourTopic", "*");

        // 启动消费者
        consumer.start();

        // 阻塞主线程,保持消费者一直运行
        while (true) {
            // do nothing
        }
    }
}

联系:

  • 消费者驱动: 无论是Pull还是Push,消费者都是主动发起消费的一方。Pull模式中,消费者主动发起拉取消息的请求;Push模式中,虽然消息是被动推送给消费者,但消费者仍然需要主动处理接收到的消息。
  • 消费者订阅: 无论是Pull还是Push,消费者都需要订阅消息的主题(Topic)。
  • 消费者负载均衡: 在Pull模式中,如果有多个消费者订阅了同一个Topic,它们之间可能需要进行负载均衡,以确保每个消费者都能获取到一定比例的消息。在Push模式中,Broker通常会进行消息推送的负载均衡。


 


http://lihuaxi.xjx100.cn/news/1889700.html

相关文章

【从零开始学习JVM | 第八篇】学习垃圾回收算法 和 垃圾回收器

前言: 现代编程语言通常采用垃圾回收机制来自动管理内存。垃圾回收机制是一种自动化的内存管理技术,可以在程序运行时自动识别和回收不再使用的内存,从而减少内存泄漏和其他内存相关问题的发生。 本文将介绍垃圾回收算法和垃圾回收器的相关…

SLAM算法与工程实践——相机篇:传统相机使用(1)

SLAM算法与工程实践系列文章 下面是SLAM算法与工程实践系列文章的总链接,本人发表这个系列的文章链接均收录于此 SLAM算法与工程实践系列文章链接 下面是专栏地址: SLAM算法与工程实践系列专栏 文章目录 SLAM算法与工程实践系列文章SLAM算法与工程实践…

java集合的迭代器与遍历

文章目录 迭代器Iterator1、什么是Iterator2,iterator接口的API3、Irerator()方法细节解释4. Irerator的原理示意图5. forEach循环与Iterator遍历的区别与联系 ListIterator1.ListIterator的概述(1) 概念(2) 解析 2.ListIterator的生成3.ListIterator的API4.ListIte…

vue小结之 watch监听(对象,单个字段属性)

好记性不如个烂笔头&#xff1a;方便每次使用忘记的时候&#xff0c;看看点滴的累积&#xff0c;点滴的成长&#xff01; watch监听使用&#xff1a; 1.监听对象&#xff1a; <input type"checkbox" id"toggle-button" name"switch" v-mode…

剑指 Offer(第2版)面试题 28:对称的二叉树

剑指 Offer&#xff08;第2版&#xff09;面试题 28&#xff1a;对称的二叉树 剑指 Offer&#xff08;第2版&#xff09;面试题 28&#xff1a;对称的二叉树解法1&#xff1a;递归解法2&#xff1a;镜像二叉树 前序遍历 剑指 Offer&#xff08;第2版&#xff09;面试题 28&…

在Nexus上配置Docker镜像仓库

现在Docker镜像的工具已不少了&#xff0c;只是在Java老牌又持久的工具Nexus上配置本地Docker仓库镜像是一件即有情怀又充份利用资源的事情。 Nexus支持多种仓库类型&#xff0c;例如&#xff1a;maven、npm、docker等。 安装Nexus &#xff08;略&#xff09; Docker镜像配…

【网络安全技术】电子邮件安全PGP,SMIME

一、PGP&#xff08;Pretty Good Privacy&#xff09; PGP是一种邮件加密手段&#xff0c;他在发邮件一方加密&#xff0c;然后发给发送方邮件服务器&#xff0c;发送方邮件服务器再发送给接收方邮件服务器&#xff0c;然后接收方再从接收方邮件服务器pop出来&#xff0c;这整…

基于docker容器化部署微服务

前言 在笔者系列文章中微服务配置隔离已经完成服务之间的配置隔离&#xff0c;服务整体来说是已经通了。 为了方便后续测试已经环境统一&#xff0c;笔者本章节会对服务进行容器化部署。由于服务器性能问题&#xff0c;本次部署采用maven完成镜像构建&#xff0c;结合docker-c…