RocketMQ-源码架构

news/2024/7/8 1:32:40

源码环境搭建

1、主要功能模块

RocketMQ官方Git仓库地址:GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.

RocketMQ的官方网站下载:下载 | RocketMQ

重要模块:

  • broker: Broker 模块(broke 启动进程)

  • client :消息客户端,包含消息生产者、消息消费者相关类

  • example: RocketMQ 示例代码

  • namesrv:NameServer模块

  • store:消息存储模块

  • remoting:远程访问模块

2、源码启动服务

将源码导入IDEA后,需要先对源码进行编译。编译指令clean install -Dmaven.test.skip=true

编译完成后就可以开始调试代码:

调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conflogback_broker.xmllogback_namesrv.xml

window10执行上面的编译指令一直报错A required class was missing while executing org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process: org/apache/commons/collections/ExtendedProperties,最后在Linux执行才成功

unzip rocketmq-rocketmq-all-4.9.5.zip
cd rocketmq-rocketmq-all-4.9.5/
mvn clean install -Dmaven.test.skip=true
启动nameServer

虽然编译指令在windows执行报错,但是nameSrv还是可以正常启动的

如果启动时报错,并提示需要配置ROCKETMQ_HOME环境变量,可以在工程里面添加:

启动Broker

启动Broker之前,需要先修改之前复制的broker.conf文件

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路径
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存储路径
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort

启动Broker时,还需要配置一个-c 参数,指向broker.conf配置文件

发送消息

在源码的example模块下,提供了非常详细的测试代码。

启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息

在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。

// 源码中指定
producer.setNamesrvAddr("127.0.0.1:9876");
消费消息

启动example模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息

consumer.setNamesrvAddr("192.168.232.128:9876");

3、读源码的方法

1、带着问题读源码。一定要自己思考!

2、小步快走。不要觉得一两遍就能读懂源码

3、分步总结。带上自己的理解,及时总结。对各种扩展功能,尝试验证

对于RocketMQ,试着去理解源码中的各种单元测试。

源码热身阶段

NameServer的启动过程

关注重点

RocketMQ集群中,实际上消息存储、推送等核心功能点是Broker。NameServer的作用,和微服务中的注册中心非常类似,只是提供了Broker端的服务注册与发现功能。

源码重点

NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController。这个Controller对象就跟MVC中的Controller是很类似的,都是响应客户端的请求。只不过,他响应的是基于Netty的客户端请求。

另外,他的实际启动过程,其实可以配合NameServer的启动脚本进行更深入的理解。

从NameServer启动和关闭这两个关键步骤,我们可以总结出NameServer的组件其实并不是很多,整个NameServer的结构是这样的:

这两个配置类就可以用来指导如何优化Nameserver的配置。比如,如何调整nameserver的端口?

可以看出,RocketMQ的整体源码风格就是典型的MVC思想。Controller响应请求,Service处理业务,各种Table保存消息

部分源码示例:

// NamesrvStartup#main
public static void main(String[] args) {
	main0(args);
}

public static NamesrvController main0(String[] args) {

	try {
		NamesrvController controller = createNamesrvController(args);
		start(controller);
		String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
		log.info(tip);
		System.out.printf("%s%n", tip);
		return controller;
	} catch (Throwable e) {
		e.printStackTrace();
		System.exit(-1);
	}

	return null;
}

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
	System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
	//PackageConflictDetect.detectFastjson();

	Options options = ServerUtil.buildCommandlineOptions(new Options());
	commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
	if (null == commandLine) {
		System.exit(-1);
		return null;
	}

	// 两个配置类在这里
	final NamesrvConfig namesrvConfig = new NamesrvConfig();
	final NettyServerConfig nettyServerConfig = new NettyServerConfig();
	// 端口号简单粗暴
	nettyServerConfig.setListenPort(9876);
	// -c指令指定配置文件,可以替换端口号。配置文件内容:listenPort=9876
	if (commandLine.hasOption('c')) {
		String file = commandLine.getOptionValue('c');
		if (file != null) {
			InputStream in = new BufferedInputStream(new FileInputStream(file));
			properties = new Properties();
			properties.load(in);
			MixAll.properties2Object(properties, namesrvConfig);
			MixAll.properties2Object(properties, nettyServerConfig);

			namesrvConfig.setConfigStorePath(file);

			System.out.printf("load config properties file OK, %s%n", file);
			in.close();
		}
	}

	if (commandLine.hasOption('p')) {
		InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
		MixAll.printObjectProperties(console, namesrvConfig);
		MixAll.printObjectProperties(console, nettyServerConfig);
		System.exit(0);
	}
    
	... ...
        
	final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

	// remember all configs to prevent discard
	controller.getConfiguration().registerConfig(properties);

	return controller;
}

// NamesrvController#NamesrvController
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
	this.namesrvConfig = namesrvConfig;
	this.nettyServerConfig = nettyServerConfig;
	this.kvConfigManager = new KVConfigManager(this);
	this.routeInfoManager = new RouteInfoManager();
	this.brokerHousekeepingService = new BrokerHousekeepingService(this);
	this.configuration = new Configuration(
		log,
		this.namesrvConfig, this.nettyServerConfig
	);
	this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

// NamesrvController#initialize
public boolean initialize() {

	this.kvConfigManager.load();

	this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

	... ...
}

Broker服务启动过程

关注重点

Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。

重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点

源码重点

Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试

启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动

// BrokerStartup#main
public static void main(String[] args) {
	start(createBrokerController(args));
}

public static BrokerController createBrokerController(String[] args) {
	System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

	try {
		//PackageConflictDetect.detectFastjson();
		Options options = ServerUtil.buildCommandlineOptions(new Options());
		commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
			new PosixParser());
		if (null == commandLine) {
			System.exit(-1);
		}
		
		// Broker服务配置
		final BrokerConfig brokerConfig = new BrokerConfig();
		// Netty服务端占用了10911端口。同样也可以在配置文件中覆盖
		final NettyServerConfig nettyServerConfig = new NettyServerConfig();
		// Broker既要作为Netty服务端,向客户端提供核心业务能力,又要作为Netty客户端,向NameServer注册心跳
		final NettyClientConfig nettyClientConfig = new NettyClientConfig();

		nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
			String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
		nettyServerConfig.setListenPort(10911);
		// 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置
		final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
		
		... ...
	} catch (Throwable e) {
		e.printStackTrace();
		System.exit(-1);
	}

	return null;
}

BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig 这几个配置是了解如何优化 RocketMQ 使用的关键

在BrokerController.start方法启动了一大堆Broker的核心服务,部分源码:

// BrokerController#start
public void start() throws Exception {
	if (this.messageStore != null) {
		//启动核心的消息存储组件
		this.messageStore.start();
	}

	if (this.remotingServer != null) {
		this.remotingServer.start();
	}

	if (this.fastRemotingServer != null) {
		//启动两个Netty服务
		this.fastRemotingServer.start();
	}

	if (this.brokerOuterAPI != null) {
		//启动客户端,往外发请求
		this.brokerOuterAPI.start();
	}

	... ...

	this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

		@Override
		public void run() {
			try {
				//向NameServer注册心跳
				BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
			} catch (Throwable e) {
				log.error("registerBrokerAll Exception", e);
			}
		}
	}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

	if (this.brokerStatsManager != null) {
		this.brokerStatsManager.start();
	}

	if (this.brokerFastFailure != null) {
		//负责具体业务的功能组件
		this.brokerFastFailure.start();
	}
}

抽象出Broker的一个整体结构:

实际上,在应用中,可以通过producer.setSendMessageWithVIPChannel(true),让少量比较重要的producer走VIP的通道。而在消费者端,也可以通过consumer.setVipChannelEnabled(true),让消费者支持VIP通道的数据。

小试牛刀阶段

开始理解一些比较简单的业务逻辑

Netty服务注册框架

关注重点

网络通信服务是构建分布式应用的基础,也是理解RocketMQ底层业务的基础。

重点梳理RocketMQ的这个服务注册框架,理解各个业务进程之间是如何进行RPC远程通信的

Netty的所有远程通信功能都由remoting模块实现。RemotingServer模块里包含了RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,NameServer主要是RPC的服务端RemotingServer,Broker对于客户端来说,是RPC的服务端RemotingServer,而对于NameServer来说,又是RPC的客户端。各种Client是RPC的客户端RemotingClient。

RocketMQ基于Netty保持客户端与服务端的长连接Channel。RemotingServer和RemotingClient都需要注册自己的服务。

源码重点

1、NameServer需要NettyServer。客户端,Producer和Consumer,需要NettyClient。

2、所有的RPC请求数据都封装成RemotingCommand对象。每个处理消息的服务逻辑,都会封装成一个NettyRequestProcessor对象。

3、服务端和客户端都维护一个processorTable,这是个HashMap。key是服务码requestCode,value是对应的运行单元 Pair<NettyRequestProcessor,ExecutorService>类型,包含了处理Processor和执行线程的线程池。具体的Processor,由业务系统自行注册。Broker服务注册:BrokerController.registerProcessor(),客户端的服务注册:MQClientAPIImpl。NameServer则会注册一个大的DefaultRequestProcessor,统一处理所有服务。

4、请求类型分为REQUEST和RESPONSE。这是为了支持异步的RPC调用。NettyServer处理完请求后,可以先缓存到responseTable中,等NettyClient下次来获取,这样就不用阻塞Channel了,可以提升请求吞吐量。

5、重点理解remoting包中是如何实现全流程异步化。

整体RPC框架流程:

RocketMQ使用Netty框架提供了一套基于服务码的服务注册机制,让各种不同的组件都可以按照自己的需求,注册自己的服务方法。RocketMQ的这一套服务注册机制,是非常简洁实用的。要开发一个大型的IM项目,要加减好友、发送文本,图片,甚至红包、维护群聊信息等等各种各样的请求,这些请求如何封装,就可以很好的参考这个框架。

关于RocketMQ的同步结果推送与异步结果推送

RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer /* opaque */, ResponseFuture>

处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。

//NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
	final long timeoutMillis)
	throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
	final int opaque = request.getOpaque();

	try {
		final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
		this.responseTable.put(opaque, responseFuture);
		final SocketAddress addr = channel.remoteAddress();
		channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture f) throws Exception {
				if (f.isSuccess()) {
					responseFuture.setSendRequestOK(true);
					return;
				} else {
					responseFuture.setSendRequestOK(false);
				}

				responseTable.remove(opaque);
				responseFuture.setCause(f.cause());
				responseFuture.putResponse(null);
				log.warn("send a request command to channel <" + addr + "> failed.");
			}
		});

		RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
		if (null == responseCommand) {
			if (responseFuture.isSendRequestOK()) {
				throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
					responseFuture.getCause());
			} else {
				throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
			}
		}

		return responseCommand;
	} finally {
		this.responseTable.remove(opaque);
	}
}

//ResponseFuture#waitResponse
//实际上,同步也是通过异步实现的
//发送消息后,通过countDownLatch阻塞当前线程,造成同步等待的效果
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
	this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
	return this.responseCommand;
}

//ResponseFuture#putResponse
//等待异步获取到消息后,再通过countDownLatch释放当前线程
public void putResponse(final RemotingCommand responseCommand) {
	this.responseCommand = responseCommand;
	this.countDownLatch.countDown();
}

处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。

//NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
	final InvokeCallback invokeCallback)
	throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
	long beginStartTime = System.currentTimeMillis();
	final int opaque = request.getOpaque();
	boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
	if (acquired) {
		final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
		long costTime = System.currentTimeMillis() - beginStartTime;
		if (timeoutMillis < costTime) {
			once.release();
			throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
		}

		final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
		this.responseTable.put(opaque, responseFuture);
		try {
			channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture f) throws Exception {
					if (f.isSuccess()) {
						responseFuture.setSendRequestOK(true);
						return;
					}
					requestFail(opaque);
					log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
				}
			});
		} catch (Exception e) {
			responseFuture.release();
			log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
			throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
		}
	} else {
		if (timeoutMillis <= 0) {
			throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
		} else {
			String info =
				String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
					timeoutMillis,
					this.semaphoreAsync.getQueueLength(),
					this.semaphoreAsync.availablePermits()
				);
			log.warn(info);
			throw new RemotingTimeoutException(info);
		}
	}
}

//org.apache.rocketmq.remoting.netty.NettyRemotingServer
this.timer.scheduleAtFixedRate(new TimerTask() {

	@Override
	public void run() {
		try {
			NettyRemotingServer.this.scanResponseTable();
		} catch (Throwable e) {
			log.error("scanResponseTable exception", e);
		}
	}
}, 1000 * 3, 1000);

Broker心跳注册管理

关注重点

Broker会在启动时向所有NameServer注册自己的服务信息,并且会定时往NameServer发送心跳信息。而NameServer会维护Broker的路由列表,并对路由表进行实时更新。

源码重点

Broker启动后会立即发起向NameServer注册心跳。方法入口:BrokerController.this.registerBrokerAll。 然后启动一个定时任务,以10秒延迟,默认30秒的间隔持续向NameServer发送心跳。

NameServer内部会通过RouteInfoManager组件及时维护Broker信息。同时在NameServer启动时,会启动定时任务,扫描不活动的Broker。方法入口:NamesrvController.initialize方法。

极简化的服务注册发现流程

为什么RocketMQ要自己实现一个NameServer,而不用Zookeeper、Nacos这样现成的注册中心?

首先,依赖外部组件会对产品的独立性形成侵入,不利于自己的版本演进。Kafka要抛弃Zookeeper就是一个先例。

另外,其实更重要的还是对业务的合理设计。NameServer之间不进行信息同步,而是依赖Broker端向所有NameServer同时发起注册。这让NameServer的服务可以非常轻量。

但是,要知道,这种极简的设计,其实是以牺牲数据一致性为代价的。Broker往多个NameServer同时发起注册,有可能部分NameServer注册成功,而部分NameServer注册失败了。这样,多个NameServer之间的数据是不一致的。作为注册中心,这是不可接受的。但是对于RocketMQ,这又变得可以接受了。因为客户端从NameServer上获得的,只要有一个正常运行的Broker就可以了,并不需要完整的Broker列表。

Producer发送消息过程

关注重点

回顾Producer使用案例

Producer有两种:

  • 一种是普通发送者:DefaultMQProducer。只负责发送消息,发送完消息,就可以停止了。

  • 另一种是事务消息发送者: TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务,这就要求事务消息发送者虽然是一个客户端,但是也要完成整个事务消息的确认机制后才能退出。

事务消息机制后面将结合Broker进行整理分析。这一步暂不关注。这里只关注DefaultMQProducer的消息发送过程。

整个Producer的使用流程,大致分为两个步骤:一是调用start方法,进行一大堆的准备工作。 二是各种send方法,进行消息发送。

重点关注以下几个问题:

1、Producer启动过程中启动了哪些服务

2、Producer如何管理broker路由信息。 可以设想一下,如果Producer启动了之后,NameServer挂了,那么Producer还能不能发送消息?

3、关于Producer的负载均衡。也就是Producer到底将消息发到哪个MessageQueue中。这里可以结合顺序消息机制来理解一下。消息中那个莫名奇妙的MessageSelector到底是如何工作的。

源码重点
  • Producer的核心启动流程

所有Producer的启动过程,最终都会调用到DefaultMQProducerImpl#start方法。在start方法中的通过一个mQClientFactory对象,启动生产者的一大堆重要服务。

这里其实就是一种设计模式,虽然有很多种不同的客户端,但是这些客户端的启动流程最终都是统一的,全是交由MQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中,按照服务端的要求注册不同的信息。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable

  • 发送消息的核心流程

1、发送消息时,会维护一个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。

2、生产者如何找MessageQueue: 默认情况下,生产者是按照轮训的方式,依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后,下一次就会跳过这个Broker。

//org.apache.rocketmq.client.impl.producer.TopicPublishInfo
//如果进到这里lastBrokerName不为空,那么表示上一次向这个Broker发送消息是失败的,这时就尽量不要再往这个Broker发送消息了。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
	if (lastBrokerName == null) {
		return selectOneMessageQueue();
	} else {
		for (int i = 0; i < this.messageQueueList.size(); i++) {
			int index = this.sendWhichQueue.incrementAndGet();
			int pos = Math.abs(index) % this.messageQueueList.size();
			if (pos < 0)
				pos = 0;
			MessageQueue mq = this.messageQueueList.get(pos);
			if (!mq.getBrokerName().equals(lastBrokerName)) {
				return mq;
			}
		}
		return selectOneMessageQueue();
	}
}

3、如果在发送消息时传了Selector,那么Producer就不会走这个负载均衡的逻辑,而是会使用Selector去寻找一个队列。 具体参见org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 方法

//DefaultMQProducerImpl#sendSelectImpl
private SendResult sendSelectImpl(
	Message msg,
	MessageQueueSelector selector,
	Object arg,
	final CommunicationMode communicationMode,
	final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
	long beginStartTime = System.currentTimeMillis();
	this.makeSureStateOK();
	Validators.checkMessage(msg, this.defaultMQProducer);

	TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
	if (topicPublishInfo != null && topicPublishInfo.ok()) {
		MessageQueue mq = null;
		try {
			List<MessageQueue> messageQueueList =
				mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
			Message userMessage = MessageAccessor.cloneMessage(msg);
			String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
			userMessage.setTopic(userTopic);

			mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
		} catch (Throwable e) {
			throw new MQClientException("select message queue threw exception.", e);
		}

		long costTime = System.currentTimeMillis() - beginStartTime;
		if (timeout < costTime) {
			throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
		}
		if (mq != null) {
			return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
		} else {
			throw new MQClientException("select message queue return null.", null);
		}
	}

	validateNameServerSetting();
	throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

Consumer拉取消息过程

关注重点

回顾消费者的几个重点问题:

  • 消费者也是有两种,推模式消费者和拉模式消费者。优秀的MQ产品都会有一个高级的目标,就是要提升整个消息处理的性能。而要提升性能,服务端的优化手段往往不够直接,最为直接的优化手段就是对消费者进行优化。所以在RocketMQ中,整个消费者的业务逻辑是非常复杂的,甚至某种程度上来说,比服务端更复杂,所以,在这里重点关注用得最多的推模式的消费者。

  • 消费者组之间有集群模式和广播模式两种消费模式。就要了解下这两种集群模式是如何做的逻辑封装。

  • 然后关注消费者端的负载均衡的原理。即消费者是如何绑定消费队列的,那些消费策略到底是如何落地的。

  • 最后关注在推模式的消费者中,MessageListenerConcurrently 和MessageListenerOrderly这两种消息监听器的处理逻辑到底有什么不同,为什么后者能保持消息顺序。

源码重点

Consumer的核心启动过程和Producer是一样的, 最终都是通过MQClientFactory对象启动。不过之间添加了一些注册信息。整体启动过程:

广播模式与集群模式的Offset处理

在DefaultMQPushConsumerImpl的start方法中,启动了非常多的核心服务。 比如,对于广播模式与集群模式的Offset处理

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
	this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
	switch (this.defaultMQPushConsumer.getMessageModel()) {
		case BROADCASTING:
			this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
			break;
		case CLUSTERING:
			this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
			break;
		default:
			break;
	}
	this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();

广播模式是使用LocalFileOffsetStore,在Consumer本地保存Offset,而集群模式是使用RemoteBrokerOffsetStore,在Broker端远程保存offset。而这两种Offset的存储方式,最终都是通过维护本地的offsetTable缓存来管理Offset。

Consumer与MessageQueue建立绑定关系

start方法中还一个比较重要的东西是给rebalanceImpl设定了一个AllocateMessageQueueStrategy,用来给Consumer分配MessageQueue的。

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//Consumer负载均衡策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

AllocateMessageQueueStrategy就是用来给Consumer和MessageQueue之间建立一种对应关系的。也就是说,只要Topic当中的MessageQueue以及同一个ConsumerGroup中的Consumer实例都没有变动,那么某一个Consumer实例只是消费固定的一个或多个MessageQueue上的消息,其他Consumer不会来抢这个Consumer对应的MessageQueue。

为什么要让一个MessageQueue只能由同一个ConsumerGroup中的一个Consumer实例来消费?

因为Broker需要按照ConsumerGroup管理每个MessageQueue上的Offset,如果一个MessageQueue上有多个同属一个ConsumerGroup的Consumer实例,他们的处理进度就会不一样。这样的话,Offset就乱套了。

顺序消费与并发消费

在start方法中,启动了consumerMessageService线程,进行消息拉取。

//Consumer中自行指定的回调函数
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
	this.consumeOrderly = true;
	this.consumeMessageService =
		new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
	this.consumeOrderly = false;
	this.consumeMessageService =
		new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

this.consumeMessageService.start();

Consumer通过registerMessageListener方法指定的回调函数,都被封装成了ConsumerMessageService的子实现类。

而对于这两个服务实现类的调用,会延续到DefaultMQPushConsumerImpl的pullCallback对象中。也就是Consumer每拉过来一批消息后,就向Broker提交下一个拉取消息的的请求。

这里也可以印证一个点,就是顺序消息,只对异步消费也就是推模式有效。同步消费的拉模式是无法进行顺序消费的。因为这个pullCallback对象,在拉模式的同步消费时,根本就没有往下传。

当然,这并不是说拉模式不能锁定队列进行顺序消费,拉模式在Consumer端应用就可以指定从哪个队列上拿消息。

PullCallback pullCallback = new PullCallback() {
	@Override
	public void onSuccess(PullResult pullResult) {
		if (pullResult != null) {
			pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
				subscriptionData);

			switch (pullResult.getPullStatus()) {
				case FOUND:
					... ...
					DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
						pullResult.getMsgFoundList(),
						processQueue,
						pullRequest.getMessageQueue(),
						dispatchToConsume);

这里提交的,实际上是一个ConsumeRequest线程。而提交的这个ConsumeRequest线程,在两个不同的ConsumerService中有不同的实现。

这其中,两者最为核心的区别在于ConsumerMessageOrderlyService是锁定了一个队列,处理完了之后,再消费下一个队列。

@Override
public void run() {
	... ...

	final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
	synchronized (objLock) {
		... ...
	}
}

为什么给队列加个锁,就能保证顺序消费呢?

从源码中可以看到,Consumer提交请求时,都是往线程池里异步提交的请求。如果不加队列锁,那么就算Consumer提交针对同一个MessageQueue的拉取消息请求,这些请求都是异步执行,他们的返回顺序是乱的,无法进行控制。给队列加个锁之后,就保证了针对同一个队列的第二个请求,必须等第一个请求处理完了之后,释放了锁,才可以提交。这也是在异步情况下保证顺序的基础思路。

实际拉取消息还是通过PullMessageService完成的

start方法中,相当于对很多消费者的服务进行初始化,包括指定一些服务的实现类,以及启动一些定时的任务线程,比如清理过期的请求缓存等。最后,会随着mQClientFactory组件的启动,启动一个PullMessageService。实际的消息拉取都交由PullMesasgeService进行。

所谓消息推模式,其实还是通过Consumer拉消息实现的。

//org.apache.rocketmq.client.impl.consumer.PullMessageService
private void pullMessage(final PullRequest pullRequest) {
	final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
	if (consumer != null) {
		DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
		impl.pullMessage(pullRequest);
	} else {
		log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
	}
}

客户端负载均衡管理总结

Producer负载均衡

Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker。

Producer轮训时,如果发现往某一个Broker上发送消息失败了,那么下一次会尽量避免再往同一个Broker上发送消息。但是,如果你的应用场景允许发送消息长延迟,也可以给Producer设定setSendLatencyFaultEnable(true)。这样对于某些Broker集群的网络不是很好的环境,可以提高消息发送成功的几率。

同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。

Consumer负载均衡

Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。

1、集群模式

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。

  • AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。

这个策略可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般也就用简单的平均分配策略或者轮询分配策略。

源码中有测试代码AllocateMachineRoomNearByTest。

  • AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者

  • AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。

  • AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。

  • AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。

  • AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在环上分布更为均匀。

最常用的就是平均分配和轮训分配了。

2、广播模式

广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。

广播模式实现的关键是将消费者的消费偏移量不再保存到broker当中,而是保存到客户端当中,由客户端自行维护自己的消费偏移量。


http://lihuaxi.xjx100.cn/news/1883962.html

相关文章

【EMNLP 2023】面向Stable Diffusion的自动Prompt工程算法

近日&#xff0c;阿里云人工智能平台PAI与华南理工大学朱金辉教授团队合作在自然语言处理顶级会议EMNLP2023上发表了BeautifulPrompt的深度生成模型&#xff0c;可以从简单的图片描述中生成高质量的提示词&#xff0c;从而使文生图模型能够生成更美观的图像。BeautifulPrompt通…

使用C++和双指针算法移除数组中的元素,且原地移除,不使用额外的空间

输入一个数组nums和一个值val&#xff0c;在该数组中&#xff0c;凡是与val相等的元素全部移除&#xff0c;并最终输出该数组&#xff0c;C代码如下&#xff1a; #include<iostream> #include<vector> #include<ctime>//计算代码所需要的时间 using namespac…

IDEA版SSM入门到实战(Maven+MyBatis+Spring+SpringMVC) -Spring IOC底层实现

第一章 SpringIOC底层实现 IOC&#xff1a;将对象的控制器反转给Spring 1.1 BeanFactory与ApplicationContexet BeanFactory&#xff1a;IOC容器的基本实现&#xff0c;是Spring内部的使用接口&#xff0c;是面向Spring本身的&#xff0c;不是提供给开发人员使用的。****Appli…

用提问的方式来学习:冯·诺伊曼体系结构与操作系统OS

学习冯诺伊曼体系结构之前&#xff0c;我们要本着两个问题来学习&#xff1a; 什么是冯诺伊曼体系结构&#xff1f;为什么要有冯诺伊曼体系结构&#xff1f; 一、冯诺伊曼体系结构 1. 什么是冯诺伊曼体系结构&#xff1f; 那我们就先来回答一下什么是冯诺伊曼体系结构&#x…

[cleanrl] ppo_continuous_action源码解析

1 import库&#xff08;略&#xff09; import os import random import time from dataclasses import dataclassimport gymnasium as gym import numpy as np import torch import torch.nn as nn import torch.optim as optim import tyro from torch.distributions.normal…

C++入门【6-C++ 修饰符类型】

C 修饰符类型 C 允许在 char、int 和 double 数据类型前放置修饰符。 修饰符是用于改变变量类型的行为的关键字&#xff0c;它更能满足各种情境的需求。 下面列出了数据类型修饰符&#xff1a; signed&#xff1a;表示变量可以存储负数。对于整型变量来说&#xff0c;signe…

RealSense最简单配置环境只需要5分钟,不用自行添加任何包含目录、库目录。Visual Studio2022、C++。

又开始搞点云了&#xff0c;现在用的是Intel的realsense。 看网上步骤都挺繁琐的&#xff0c;本文搭建只需要3分钟。直接用官方提供的属性表&#xff0c;不用自行添加任何包含目录、库目录。 第一分钟&#xff1a;用Visual Studio新建一个工程&#xff08;此时你是没有intel.…

FTR223限时回归?经典三花再加金翅膀,CL500特别款亮相

FTR223可以说是非常经典的一款本田小攀爬车型了&#xff0c;之前我还有幸玩过一段时间&#xff0c;最近本田在泰国车展上展出了CL500的特别版&#xff0c;其中FTR223纪念版的版画让人眼前一亮&#xff0c;经典的白、红、蓝三色搭配让人眼前一亮。 CL500这台车在国内今年刚上市&…