NameServer路由注册
1. Broker发起路由注册
在了解路由注册的机制之前首先要知道路由的发起者是谁,在RocketMQ中路由注册主要是由Broker发起。Broker启动,路由注册发起也分为三种:
- 普通的模式-单个主无备
- HA模式-有主有备
- A-HA(AutoSwitch HA)主备自主切换模式-有主有备,主备自动切换
2.路由注册的流程
对应上面的三种模式大体上是一样的,只是在A-HA的模式中Broker首先会向Controller注册并且注册成功后再将路由信息注册到Broker。 Broker启动的过程中会往每一个配置的NameServer中发送一个RequestCode.REGISTER_BROKER(注册Broker)的请求。请求中包含了整个Broker的元数据。然后将数据注册到NameServer。
3.路由注册流程源码分析
路由注册主要有两个地方:
- Broker启动过程中注册
- Broker定时任务调用
3.1启动过程中注册
代码入口在 **BrokerController#start **
public void start() throws Exception {
//省略部分代码
startBasicService();
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
//省略部分代码
}
启动Broker分为两种:
- 正常的情况和HA模式下启动这种模式都是调用了如下的代码入口进行注册:
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
- A-HA的模式启动,这个首先需要Broker注册到Controller,Controller的逻辑处理成功后就会将Broker注册到Name Server,以BrokerController#start的方法中的startBasicService()作为入口
3.2 定时任务更新注册信息
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
return;
}
if (isIsolated) {
BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
Broker启动后会启动一个去更新Broker在NameSrever的路由信息的定时任务。
4. Broker路由注册详解
路由最终的发起方法是BrokerController#registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isInBrokerContainer())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
Broker将元数据包装成 TopicConfigAndMappingSerializeWrapper 发送到NameServer
5. 总结
RocketMQ NameServer 是 RocketMQ 的核心组件之一,主要负责管理和维护消息队列的路由信息,同时提供了注册和发现服务的功能。以下是 RocketMQ NameServer 路由注册的简要总结:
- NameServer 维护了 Broker 集群的地址列表和 Topic 的路由信息。当 Producer 或 Consumer 发起请求时,NameServer 会根据请求的 Topic 找到对 应的 Broker 地址,将请求转发到对应的 Broker 上。
- 每个 Broker 在启动时会向所有配置的 NameServer 发送心跳包,并定期发送更新路由信息的请求。NameServer 收到这些请求后,会将 Broker 的地址信息和 Topic 路由信息更新到自己的内存中。
- Producer 在发送消息之前需要向 NameServer 查询指定 Topic 的路由信息,包括该 Topic 下有哪些 Broker 负责消息存储和消费,并根据负载均衡算法选择一个 Broker 发送消息。
- Consumer 在启动时需要向 NameServer 查询指定 Topic 的路由信息,获取该 Topic 下所有的 Broker 地址,并根据负载均衡算法从其中选择一个或多个 Broker 订阅消息。
- 如果 Broker 集群发生变化,例如新增或下线了 Broker,或者某个 Broker 上下线了 Topic,NameServer 会及时更新自己的路由信息,并通知 Producer 和 Consumer 进行相应的调整。
总之,NameServer 路由注册是 RocketMQ 架构中非常重要的一环,它实现了消息队列的动态路由和负载均衡,并能够自适应地处理 Broker 集群变化的情况,保证了消息的高可用和高性能。