Rocketmq生产者启动流程分析
1. 消息发送
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static final int MESSAGE_COUNT = 1000;
public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";
public static final String TAG = "TagA";
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
信息
例子代码来自RocketMQ官方
从上面的例子可以分析出来发送消息的几个步骤:
- 构建DefaultMQProducer实例
- 设置NameServer地址
- 启动Producer
- 构建需要发送消息
- 发送数据到Broker,等待返回结果(同步发送)
- 生产者shutdown
1、2、3步骤是Rocketmq生产者启动流程。接下来就分析启动流程
2. 生产者类继承关系
类的继承关系如上图。接下分析生长着启动的过程。
3. DefaultMQProducer#start
DefaultMQProducer#start
@Override
public void start() throws MQClientException {
//设置生产者组
this.setProducerGroup(withNamespace(this.producerGroup));
//启动服务
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
logger.warn("trace dispatcher start failed ", e);
}
}
}
主要的方法 DefaultMQProducerImpl#start
DefaultMQProducerImpl#start
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST: //初次serviceState默认值CREATE_JUST
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//创建获取ClientFactory
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//往ClientFactory注册当前生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//初始化当前生产者主题和主题发布的映射关系
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动工厂类
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//发送心跳给所有的Broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//启动定时任务
RequestFutureHolder.getInstance().startScheduledTask(this);
}
上述方法主要可以分成一下几个步骤:
- 生产者启动必要参数的校验
- 创建或者从缓存中根据生产者ID获取MQClientInstance实例
- MQClientInstance实例中注册当前生产者
- 初始化当前生产者主题和主题发布信息映射关系
- 启动MQClientInstance
- 发送心跳给所有的Broker
- 启动定时任务
解析来就分析上述的几个步骤看看具体做了什么。
4. 生产者启动流程详解
4.1 必要参数校验
主要校验了 DefaultMQProducer.producerGroup
字段是否符合要求。
4.2 创建或者从缓存中根据生产者ID获取MQClientInstance实例
MQClientManager
public class MQClientManager {
private final static Logger log = LoggerFactory.getLogger(MQClientManager.class);
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
//保存客户端ID和MQClientInstance实例的映射关系
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<>();
private MQClientManager() {
}
//省略部分代码
}
MQClientManager
主要管理客户端和MQClientInstance实例之间的映射关系。MQClientManager#getOrCreateMQClientInstance
方法获取MQClientInstance实例如果缓存中存在直接获取,缓存中不存在直接创建。
信息
MQClientInstance的创建在启动的时候进行分析