RocketMQ源码解析-topic创建机制
以下源码基于Rocket MQ 4.7.0
1. RocketMQ Topic创建机制
RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建。可以通过设置broker的配置文件来禁用或者允许自动创建。默认是开启的允许自动创建
autoCreateTopicEnable=true/false
下面会结合源码来深度分析一下自动创建和手动创建的过程。
2. 自动Topic
默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic,这定义在org.apache.rocketmq.common.MixAll类中
// Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
自动创建开关是下BrokerConfig类中有一个私有变量:
@ImportantField
private boolean autoCreateTopicEnable = true;
这变量可以通过配置文件配置来进行修改,代码中的默认值为true,所以在默认的情况下Rocket MQ是会自动创建Topic的。
在Broker启动,会调用TopicConfigManager的构造方法,在构造方法中定义了一系列RocketMQ系统内置的一些系统Topic(这里只关注一下TBW102):
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums()); //8
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums()); //8
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
这里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()
这样一段代码,在开启允许自动创建的时候,会把当前Topic的信息存入topicConfigTable变量中。然后通过发送定期发送心跳包把Topic和Broker的信息发送到NameServer的RouteInfoManager中进行保存。在BrokerController中定义了这样的一个定时任务来执行这个心跳包的发送:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
这里就说明了如何把每个Broker的系统自定义的Topic注册到NameServer。接下来看在发送过程中如何从NameServer获取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略代码
//获取路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
}
通过DefaultMQProducerImpl.tryToFindTopicPublishInfo方法获取Topic的路由信息。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//第一次从缓存中获取--肯定没有因为还没创建
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//从NameServer获取--也是没有,因为没有创建
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//第二次从这里获取
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
下面来看一下 MQClientInstance.updateTopicRouteInfoFromNameServer
的方法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
//省略代码
if (isDefault && defaultMQProducer != null) {
//使用默认的TBW102 Topic获取数据
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//这是正常的
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
//省略代码
}
如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启自动创建开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。