Skip to main content

RocketMQ源码解析-NameServer对Broker的管理

以下源码基于Rocket MQ 4.8.0

前面的 RocketMQ源码解析-NameServer启动已经知道NameServer的两大功能:

  • Broker管理
  • 路由管理

NameServer可以部署多个,相互之间独立,其他角色同时向多个NameServer上报状态信息,从而达到热备份的目的。NameServer本身是无状态的,也就是说NameServer中的Broker、Topic等信息都不会持久化,都是由各个角色定时上报并存储到内存中的(NameServer支持参数的持久化,一般用不到)。

下面就来结合源码以及项目运行添加打印日志来具体分析一下NameServer是如何对Broker进行管理的。

1. Broker元数据

当Broker启动,会向NameServer注册当前broker的信息。在NameServer通过 BrokerData 来保存Broker的元数据信息:

public class BrokerData implements Comparable<BrokerData> {
private String cluster; //broker集群名称
private String brokerName; //broker名称
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; //broker Id和broker地址的映射关系
}

我们通过打印日志来看一下这个里面的数据具体包含了那些数据,日志添加在 RouteInfoManager#registerBroker 方法中:

添加好后我们配置好broker的配置文件:

然后首先启动NameServer,再启动broker如下图:

打印的元数据如下:

打印BrokerData元数据信息:{"brokerAddrs":{0:"169.254.144.194:10911"},"brokerName":"mxsm-1","cluster":"MxsmClusterName"}

2. Broker存活状态管理

Broker的存活状态管理分为两种:

  • Broker正常下线

    Broker正常下线会给NameServer发送一个 RequestCode.UNREGISTER_BROKER 。然后调用最终调用 RouteInfoManager#unregisterBroker 方法来注销

  • Broker异常下线,NameServer通过定时任务扫描

    定时任务在NameServer启动的时候创建

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
    }, 5, 10, TimeUnit.SECONDS);

    每10秒执行一次。调用 RouteInfoManager#scanNotActiveBroker

    public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
    Entry<String, BrokerLiveInfo> next = it.next();
    long last = next.getValue().getLastUpdateTimestamp();
    //private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
    RemotingUtil.closeChannel(next.getValue().getChannel());
    it.remove();
    log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
    this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
    }
    }
    }

    通过上面的代码可以发现120秒没有更新NameServer中当Broker的状态。后将连接关闭,同时需要清除这些已关闭连接的 broker 的路由信息。这部分则是在onChannelDestroy方法中

3. Topic的信息

RouteInfoManager 类中有一个变量 topicQueueTable 这个变量用来保存Topic和Broker之间的读写队列数和权限。

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
public class QueueData implements Comparable<QueueData> {
//broker名称
private String brokerName;
//读的队列数
private int readQueueNums;
//写的队列数
private int writeQueueNums;
//读写权限
private int perm;
private int topicSynFlag;
}