# /

# NameServer功能

NameServer是Broker的注册中心,也是一个Topic路由管理器。主要包括两个功能:

# Broker管理

NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;

# 路由信息管理

每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。
然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

public class RouteInfoManager {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
1
2
3
4
5
6
7
8
9
10

# NameServer高可用

NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。
Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。
当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。

注意:Producer和Consumer同一时间与NameServer集群中其中一台建立长连接。

# NameServer持久化

NameServer作为路由信息管理器,其中的路由信息并没有持久化,直接使用内存作为存储层。

注意:NameServer的路由信息是来着Broker,因此NameServer可以不做持久化。

    public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController) {
        this.topicQueueTable = new ConcurrentHashMap<>(1024);
        this.brokerAddrTable = new ConcurrentHashMap<>(128);
        this.clusterAddrTable = new ConcurrentHashMap<>(32);
        this.brokerLiveTable = new ConcurrentHashMap<>(256);
        this.filterServerTable = new ConcurrentHashMap<>(256);
        this.topicQueueMappingInfoTable = new ConcurrentHashMap<>(1024);
        this.unRegisterService = new BatchUnregistrationService(this, namesrvConfig);
        this.namesrvConfig = namesrvConfig;
        this.namesrvController = namesrvController;
    }
1
2
3
4
5
6
7
8
9
10
11

# NameServer一致性

虽然ConcurrentHashMap是并发安全的,但其不能保证读一致性,在高并发场景下必然出现脏读。NameServer通过读写锁,保证数据的强一致性。

注意:ReentrantReadWriteLock在进行写操作时,读操作是阻塞的。

    public TopicList getAllTopicList() {
        TopicList topicList = new TopicList();
        try {
            this.lock.readLock().lockInterruptibly();
            topicList.getTopicList().addAll(this.topicQueueTable.keySet());
        } catch (Exception e) {
            log.error("getAllTopicList Exception", e);
        } finally {
            this.lock.readLock().unlock();
        }

        return topicList;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final String zoneName,
        final Long timeoutMillis,
        final Boolean enableActingMaster,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            this.lock.writeLock().lockInterruptibly();
..
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        } finally {
            this.lock.writeLock().unlock();
        }

        return result;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24