RocketMQ5.0--定时消息

news/2024/7/5 5:38:32

RocketMQ5.0–定时消息

一、定时消息概览
定时消息或延迟消息是指消息发送到Broker后,并不立即被消费而是要等到特定的时间后才能被消费。RocketMQ并不支持任意的时间精度延迟,只支持特定延迟时间的延迟消息。

消息延迟级别在Broker端通过MessageStoreConfig#messageDelayLevel配置,默认为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",其delayLevel=1则延迟1s,delayLevel=2则延迟5s,以此类推。解析该配置后,存放到org.apache.rocketmq.broker.schedule.ScheduleMessageService#delayLevelTable表中,格式为{1:1000,2:5000}。RocketMQ为每种延迟级别创建定时任务,这也是RocketMQ不支持任意时间延迟的原因。

注意,消息消费失败后,若是有延迟,也是和定时消息具有相同的逻辑。参考消息消费《RocketMQ5.0.0消息消费<三> _ 消息消费》。

org.apache.rocketmq.broker.schedule.ScheduleMessageService是定时消息实现类。消息存入commitlog文件之前需要判断消息的重试次数,如果大于0,则消息主题设置为SCHEDULE_TOPIC_XXXX,即:TopicValidator#RMQ_SYS_SCHEDULE_TOPIC属性。如下所示是该类的关键属性。

ScheduleMessageService方法的调用顺序:构造方法 -> load() -> start()方法。

// 第一次调度时延迟时间,默认1s
private static final long FIRST_DELAY_TIME = 1000L;
// 每一延迟级别调度一次后,则延迟该时间100ms再放入调度池
private static final long DELAY_FOR_A_WHILE = 100L;
// 发送异常后,则延迟该时间10s再放入调度池
private static final long DELAY_FOR_A_PERIOD = 10000L;
// 关闭时,等待5s
private static final long WAIT_FOR_SHUTDOWN = 5000L;
// 延迟睡10s
private static final long DELAY_FOR_A_SLEEP = 10L;
// 延迟级别表,解析MessageStoreConfig#messageDelayLevel后的数据结构{1:1000,2:5000}
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
// 延迟级别的消息消费进度,存储在{ROCKET_HOME}/store/config/delayOffset.json
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32);
// 最大消息延迟级别
private int maxDelayLevel;
// 是否异步传送到调度池,默认关闭
private boolean enableAsyncDeliver = false;

下图所示,是定时消息实现流程图,步骤如下:

  • step1:消息存入commitlog文件之前,如果发送消息的delayLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息队列ID为delayLevel-1;
  • step2:消息经由commitlog异步转发到主题为SCHEDULE_TOPIC_XXXX,delayLevel - 1的消息消费队列;
  • step3:定时任务Time每隔1s根据上次拉取偏移量从消费队列中取出所有消息;
  • step4:根据消息的物理偏移量与消息大小从CommitLog中拉取消息;
  • step5:根据消息属性重新创建消息,并恢复原始主题、原始消费队列,清除delayLevel属性,再存入commitlog文件;
  • step6:转发到原始主题、原始消费队列,供消费者消费。

需要注意的是延迟级别delayLevel与延迟消费队列的映射关系:消费队列ID = 延迟级别 - 1。
在这里插入图片描述
二、定时消息实现机制

1. 提交消息前的处理
消息存储流程参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,其中DefaultMessageStore#asyncPutMessage执行异步存储消息时,执行存储消息钩子列表,如下代码所示。

// 异步存放消息,继续处理下一个请求;存储完成后,异步通知客户端
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    ......
 
    // 遍历存储消息钩子列表
    for (PutMessageHook putMessageHook : putMessageHookList) {
        /*
            存储消息前验证消息格式规范;定时消息时修改为定时主题等
            如:Broker停止工作、从Broker、是否写权限、主题太长、消息体太长等
         */
        PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(msg);
        // 为null时,则消息符合规范
        if (handleResult != null) {
            return CompletableFuture.completedFuture(handleResult);
        }
    }
    
    ......
}

org.apache.rocketmq.broker.util.HookUtils#handleScheduleMessage是commit操作前对定时消息处理的核心逻辑,如下代码所示。注意事项:

  • 事务消息不能有延迟级别,若是延迟级别 > 0时,则修改为SCHEDULE_TOPIC_XXXX,消费队列ID为delayLevel -
    1;原始主题及消费队列存储到扩展属性中。
/**
 * 消息存储前,处理定时消息
 * 方法入口:{@link DefaultMessageStore#asyncPutMessage(MessageExtBrokerInner)}
 * step1:非事务消息时,检查定时消息
 * step2:若是延迟级别 > 0时,则修改为SCHEDULE_TOPIC_XXXX,消费队列ID为delayLevel - 1;原始主题及消费队列存储到扩展属性中
 *        {@link HookUtils#transformDelayLevelMessage(BrokerController, MessageExtBrokerInner)}
 * @param brokerController
 * @param msg
 * @return
 */
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
                                                     final MessageExtBrokerInner msg) {
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 非事务消息时
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        if (!isRolledTimerMessage(msg)) {
            // 检查定时消息
            if (checkIfTimerMessage(msg)) {
                if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
                    //wheel timer is not enabled, reject the message
                    return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
                }
                PutMessageResult tranformRes = transformTimerMessage(brokerController, msg);
                if (null != tranformRes) {
                    return tranformRes;
                }
            }
        }
        // Delay Delivery,若是延迟级别 > 0时,则修改为SCHEDULE_TOPIC_XXXX,消费队列ID为delayLevel - 1
        if (msg.getDelayTimeLevel() > 0) {
            // 修改主题及消费队列ID
            transformDelayLevelMessage(brokerController,msg);
        }
    }
    return null;
}

定时消息commit操作后(消息提交到Commitlog文件内存映射),异步转发到延迟级别对应的消费队列中,下面介绍定时任务处理延迟消费队列。

2. 定时调度

1):load()方法

org.apache.rocketmq.broker.schedule.ScheduleMessageService#load完成延迟消费进度的加载且解析延迟级别字符串,如下代码所示。注意事项:

  • 加载消费进度:加载延迟消费队列的消费进度文件,{ROCKET_HOME}/store/config/delayOffset.json文件,其格式:延迟级别:消费进度,如下实例:
{
	"dataVersion":{
		"counter":19,
		"stateVersion":0,
		"timestamp":1676598354088
	},
	"offsetTable":{3:17,12:0
	}
}
  • 解析配置:字符串MessageStoreConfig.messageDelayLevel转列表ScheduleMessageService#delayLevelTable。
/**
 * step1:加载延迟级别的消息消费进度,{ROCKET_HOME}/store/config/delayOffset.json文件
 * step2:解析MessageStoreConfig.messageDelayLevel转换为{@link ScheduleMessageService#delayLevelTable}
 * step3:矫正延迟级别消费的偏移量
 */
@Override
public boolean load() {
    // 加载延迟级别的消息消费进度文件
    boolean result = super.load();
    // 解析延迟级别
    result = result && this.parseDelayLevel();
    // 矫正延迟级别消费的偏移量
    result = result && this.correctDelayOffset();
    return result;
}

2):start()方法

org.apache.rocketmq.broker.schedule.ScheduleMessageService#start启动调度池,为每个延迟级别创建定时任务,注意事项:

  • 延迟级别delayLevel与延迟消费队列的映射关系:消费队列ID = 延迟级别 - 1
  • 创建定时任务DeliverDelayedMessageTimerTask线程:遍历延迟级别,并获取对应延迟队列的消费进度,创建定时任务。定时任务第一次启动时,默认延迟1s执行,第二次开始执行延迟级别对应的延迟时间。
  • 每10s执行调度池任务持久化延迟队列的消费进度。
/**
 * 启动调度池,为每个延迟级别创建定时任务
 * step1:加载延迟消息消费进度;
 * step2:遍历延迟级别,并获取对应的消费进度;
 * step3:创建定时任务,并放入调度池(延迟级别与延迟消费队列的映射关系:消费队列ID = 延迟级别 - 1)
 * step4:每10s执行调度池任务持久化延迟队列的消费进度(MessageStoreConfig.flushDelayOffsetInterval配置)
 */
public void start() {
    if (started.compareAndSet(false, true)) {
        // 加载延迟消息消费进度
        this.load();
        //
        this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
        if (this.enableAsyncDeliver) {
            this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
        }
        // 遍历延迟级别
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            // 获取延迟级别对应的消费进度
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }
 
            if (timeDelay != null) {
                // 是否异步传送到调度池,默认关闭
                if (this.enableAsyncDeliver) {
                    this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                }
                /*
                    创建Timer定时任务,并放入调度池
                    a. 定时任务第一次启动时,默认延迟1s执行,第二次开始执行对应的延迟时间
                    b. 延迟级别与延迟消费队列的映射关系:消费队列ID = 延迟级别 - 1
                 */
                this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
            }
        }
 
        // 每10s执行持久化延迟队列的消费进度(MessageStoreConfig.flushDelayOffsetInterval配置)
        this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                try {
                    if (started.get()) {
                        // 持久化延迟队列的消费进度
                        ScheduleMessageService.this.persist();
                    }
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
    }
}

3):定时调度任务
org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask是个定时调度任务线程,其下图是该线程run()的调用链。
在这里插入图片描述
org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup是调用任务的核心方法,代码如下。注意事项:

  • 获取延迟消费队列:根据延迟级别(映射为延迟消费队列ID) + 延迟主题。

  • 延迟时间是否到期:延迟时间deliverTimestamp = 延迟级别对应的延迟时间 + 消息存储时间戳,根据差值来判定是否到期countdown = deliverTimestamp - now:
    countdown > 0时:说明没有到延迟时间,则;执行下一个调度任务(100ms后从currOffset偏移量开始执行定时任务) + 更新消费进度。
    countdown <= 0时,说明到延迟时间,可以消费消息。

  • 还原原始消息:根据偏移量从Commitlog获取延迟消息,后恢复到原始消息(原始topic、原始消费队列),清除延迟级别,保留消费次数。

  • 同步syncDeliver或异步asyncDeliver处理:原始消息重新提交到Commitlog中,供消费者消费。

  • scheduleNextTimerTask(long offset, long delay):offset当前消费进度,delay(默认100ms)是定时任务100ms后从当前offset再次执行调度任务。

/**
 * 定时消息调度核心方法
 * 注意:消息消费失败返回ACK时,根据delayLevel > 0时,改变消息主题为SCHEDULE_TOPIC_XXXX,延迟消费队列ID = delayLevel -1
 *      或
 *      延迟消息(topic为:SCHEDULE_TOPIC_XXXX)写入Commitlog,进而转发到延迟消息队列(延迟消费队列ID = delayLevel -1)
 * step1:根据延迟级别(映射为消费队列ID) + 延迟主题 获取 延迟消费队列
 * step2:获取当前消费进度后的所有消息{@link ConsumeQueueInterface#iterateFrom(long)}
 * step3:遍历消息,获取消息的偏移量、大小、Tag哈希码,为获取Commitlog完整消息准备
 * step4:判断消息TAG的哈希码是否有效,计算当前延迟时间 = 延迟级别对应的延迟时间 + 消息存储时间戳
 *        {@link ScheduleMessageService#computeDeliverTimestamp(int, long)}
 * step5:矫正延迟时间{@link DeliverDelayedMessageTimerTask#correctDeliverTimestamp(long, long)}
 * step6:判定延迟是否到期:countdown = deliverTimestamp - now
 *        countdown > 0时:说明没有到延迟时间,则;执行下一个调度任务(100ms后从currOffset偏移量开始执行定时任务) + 更新消费进度
 *        countdown <= 0时,说明到延迟时间,可以消费消息
 * step7:根据延迟消息的偏移量、大小从Commitlog获取完整延迟消息
 *        {@link MessageStore#lookMessageByOffset(long, int)}
 * step8:延迟消息恢复到原始消息(原始topic、原始消费队列),清除延迟级别,保留消费次数
 *        {@link ScheduleMessageService#messageTimeup}
 * step9:原始消息再次放入Commitlog,并转发到相应的原始消费队列,供消费者消费
 *        {@link DeliverDelayedMessageTimerTask#asyncDeliver} 和 {@link DeliverDelayedMessageTimerTask#syncDeliver}
 */
public void executeOnTimeup() {
    // 根据延迟级别(映射为消费队列ID) + 延迟主题 获取 延迟消费队列
    ConsumeQueueInterface cq =
        ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));
 
    // 消费队列为null,说明没有该延迟级别的消费队列,忽略本次调度任务,创建下次调度任务
    if (cq == null) {
        // 下次调度任务
        this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
        return;
    }
 
    // 获取当前消费进度后的所有消息
    ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);
    // 未找到消息,创建下次调度任务
    if (bufferCQ == null) {
        long resetOffset;
        if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
            log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
                this.offset, resetOffset, cq.getQueueId());
        } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
            log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
                this.offset, resetOffset, cq.getQueueId());
        } else {
            resetOffset = this.offset;
        }
 
        this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
        return;
    }
 
    long nextOffset = this.offset;
    try {
        while (bufferCQ.hasNext() && isStarted()) {
            // 获取消息队列元素
            CqUnit cqUnit = bufferCQ.next();
 
            // 获取消息的偏移量、大小、Tag哈希码,为获取Commitlog完整消息准备
            long offsetPy = cqUnit.getPos();      // 消息偏移量
            int sizePy = cqUnit.getSize();        // 消息大小
            long tagsCode = cqUnit.getTagsCode(); // 消息TAG的哈希码
 
            // 消息TAG的哈希码是否有效
            if (!cqUnit.isTagsCodeValid()) {
                //can't find ext content.So re compute tags code.
                log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                    tagsCode, offsetPy, sizePy);
                // 获取当前消息的存储时间戳
                long msgStoreTime = ScheduleMessageService.this.brokerController.getMessageStore().getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                // 计算当前延迟时间 = 延迟级别对应的延迟时间 + 消息存储时间戳
                tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
            }
 
            long now = System.currentTimeMillis();
            // 矫正延迟时间
            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 
            // 计算下条消息的偏移量
            long currOffset = cqUnit.getQueueOffset();
            assert cqUnit.getBatchNum() == 1;
            nextOffset = currOffset + cqUnit.getBatchNum();
 
            // 定时消息的到期时间,> 0时:说明没有到延迟时间;<= 0时,说明到延迟时间,可以消费消息
            long countdown = deliverTimestamp - now;
            // > 0时:说明没有到延迟时间
            if (countdown > 0) {
                // 没有到延迟时间,执行下一个调度任务(100ms后从currOffset偏移量开始执行定时任务)
                this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);
                // 更新该延迟级别对应消费队列的消费进度
                ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
                return;
            }
 
            // 从Commitlog获取完整消息(延迟消息)
            MessageExt msgExt = ScheduleMessageService.this.brokerController.getMessageStore().lookMessageByOffset(offsetPy, sizePy);
            if (msgExt == null) {
                continue;
            }
 
            /*
             * 从Commitlog获取的延迟消息转为之前的原始消息
             * 清除延迟级别属性;恢复原先的消息topic、消费队列;消费次数不会丢失
             */
            MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
            if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                    msgInner.getTopic(), msgInner);
                continue;
            }
 
            boolean deliverSuc;
            // 异步传送到Commitlog
            if (ScheduleMessageService.this.enableAsyncDeliver) {
                deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
            }
            // 同步传送到Commitlog
            else {
                deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
            }
 
            if (!deliverSuc) {
                this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                return;
            }
        }
    } catch (Exception e) {
        log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
    } finally {
        bufferCQ.release();
    }
 
    this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
 
// 下次调度任务
public void scheduleNextTimerTask(long offset, long delay) {
    ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
        this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
}

三、参考资料
https://blog.csdn.net/yunqiinsight/article/details/126284555


http://lihuaxi.xjx100.cn/news/1320870.html

相关文章

第二章 物理层【计算机网络】

第二章 数据通信基础【计算机网络】 前言推荐第二章 数据通信基础2.1物理层的基本概念2.2数据通信的基础知识2.2.1 数据通信系统的模型2.2.2有关信道的几个基本概念2.2.3 信道的极限容量 2.3物理层下面的传输媒体2.3.1导引型传输媒体2.3.2非导引型传输媒体 2.4信道复用技术2.4.…

AtcoderABC218(D~G)题解

T1 构成矩形&#xff08;ABC218 D&#xff09; 先预处理出所有与 x x x轴平行的边&#xff0c;然后暴力判断其中两条边能不能构成矩形。 #include <bits/stdc.h> using namespace std; #define LL long long const LL MAXN 2005; LL n; struct node1 {LL x, y;bool op…

因果推断Causal Inference: What If (the book)没有模型的因果推理部分章节结构

第一章 因果效应的定义 1.1 个体因果效应&#xff1a;介绍了个体因果效应的概念和定义&#xff0c;即在给定某个干预条件下&#xff0c;个体结果的变化量。1.2 平均因果效应&#xff1a;介绍了平均因果效应的概念和定义&#xff0c;即在给定某个干预条件下&#xff0c;总体结果…

消息队列 - RocketMQ

1. 名词解释和概念 NameServer&#xff1a; 是一个无状态节点&#xff0c;可集群部署&#xff0c;节点之间无任何信息同步用于服务注册和发现&#xff0c;为 MQ 集群提供服务协调与治理记录并维护 Topic 和 Broker 的信息为生产者和消费者提供 Topic 的路由信息 无状态和有状…

【C语言进阶(五)】指针进阶详解(上)

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C语言学习分享⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习更多C语言知识   &#x1f51d;&#x1f51d; 指针进阶 1. 前言 2. 字符指针 2.…

【PCIE】协议分析之-hot-reset热复位

被上游指定热复位整个通路 当高层&#xff08;higher Layer&#xff09;指示某些通道进行热复位&#xff08;Hot Reset&#xff09;时&#xff0c;以下操作将被执行&#xff1a; 所有在配置的链路中的通道都会发送带有热复位位&#xff08;Hot Reset bit&#xff09;和配置的…

第三章 数据链路层【计算机网络】

第三章 数据链路层【计算机网络】 前言推荐第三章 数据链路层3.1 数据链路层的几个共同问题3.1.1 数据链路和帧3.1.2 三个基本问题 3.2点对点协议PPP3.2.1 PPP协议的特点3.2.2 PPP协议的帧格式3.2.3 PPP协议的工作状态 3.3 使用广播信道的数据链路层3.3.1 局域网的数据链路层3.…

spark动态资源调度中的shuffle service的数据清理

External Shuffle Service的问题 在 spark2 中&#xff0c;如果想要使用动态资源调度&#xff0c;external shuffle service外部独立的shuffle服务是必须条件&#xff0c;因为 spark 需要确保回收 executor 时不会删除生成的 shuffle 数据&#xff0c;外部的 shuffle 服务可以…