nacos源码6-服务管理-服务端

2019-01-16  本文已影响0人  modou1618

一 raft

1.1 初始化

public void onApplicationEvent(WebServerInitializedEvent event) {
    RaftCore.init();
}
GlobalExecutor.register(new MasterElection());
GlobalExecutor.register1(new HeartBeat());
GlobalExecutor.register(new AddressServerUpdater(), GlobalExecutor.ADDRESS_SERVER_UPDATE_INTERVAL_MS);
if (lock.tryLock(INIT_LOCK_TIME_SECONDS, TimeUnit.SECONDS)) {
    initialized = true;
    lock.unlock();
}

1.2 Notifier

public static void listen(RaftListener listener) {
    if (listeners.contains(listener)) {
        return;
    }

    listeners.add(listener);

    for (RaftListener listener1 : listeners) {
        if (listener1 instanceof VirtualClusterDomain) {
            Loggers.RAFT.debug("listener in listeners: {}", ((VirtualClusterDomain) listener1).getName());
        }
    }

    // if data present, notify immediately
    for (Datum datum : datums.values()) {
        if (!listener.interests(datum.key)) {
            continue;
        }

        try {
            listener.onChange(datum.key, datum.value);
        } catch (Exception e) {
            Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e);
        }
    }
}

1.3 AddressServerUpdater

List<String> servers = NamingProxy.getServers();
List<RaftPeer> peerList = new ArrayList<RaftPeer>(peers.allPeers());

1.4 HeartBeat

1.4.1 发送心跳包

public void run() {
    try {
        RaftPeer local = peers.local();
        local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
        if (local.heartbeatDueMs > 0) {
            return;
        }

        local.resetHeartbeatDue();

        sendBeat();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
    }

}
RaftPeer local = peers.local();
if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
    return;
}
// build data
JSONObject packet = new JSONObject();
packet.put("peer", local);

JSONArray array = new JSONArray();

if (Switch.isSendBeatOnly()) {
    Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(Switch.isSendBeatOnly()));
}

if (!Switch.isSendBeatOnly()) {
    for (Datum datum : datums.values()) {

        JSONObject element = new JSONObject();
        String key;

        if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
            key = (datum.key).split(UtilsAndCommons.DOMAINS_DATA_ID)[1];
            element.put("key", UtilsAndCommons.RAFT_DOM_PRE + key);
        } else if (datum.key.startsWith(UtilsAndCommons.IPADDRESS_DATA_ID_PRE)) {
            key = (datum.key).split(UtilsAndCommons.IPADDRESS_DATA_ID_PRE)[1];
            element.put("key", UtilsAndCommons.RAFT_IPLIST_PRE + key);
        } else if (datum.key.startsWith(UtilsAndCommons.TAG_DOMAINS_DATA_ID)) {
            key = (datum.key).split(UtilsAndCommons.TAG_DOMAINS_DATA_ID)[1];
            element.put("key", UtilsAndCommons.RAFT_TAG_DOM_PRE + key);
        } else if (datum.key.startsWith(UtilsAndCommons.NODE_TAG_IP_PRE)) {
            key = (datum.key).split(UtilsAndCommons.NODE_TAG_IP_PRE)[1];
            element.put("key", UtilsAndCommons.RAFT_TAG_IPLIST_PRE + key);
        }
        element.put("timestamp", datum.timestamp);

        array.add(element);
    }
} else {
    Loggers.RAFT.info("[RAFT] send beat only.");
}

packet.put("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JSON.toJSONString(packet));
public Integer onCompleted(Response response) throws Exception {
    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
        Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
            response.getResponseBody(), server);
        MetricsMonitor.getLeaderSendBeatFailedException().increment();
        return 1;
    }

    peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
    Loggers.RAFT.info("receive beat response from: {}", url);
    return 0;
}

1.4.2 接收处理心跳包

@RequestMapping("/beat")
public JSONObject beat(HttpServletRequest request, HttpServletResponse response) throws Exception {

    String entity = new String(IoUtils.tryDecompress(request.getInputStream()), "UTF-8");

    String value = Arrays.asList(entity).toArray(new String[1])[0];

    JSONObject json = JSON.parseObject(value);
    JSONObject beat = JSON.parseObject(json.getString("beat"));

    RaftPeer peer = RaftCore.HeartBeat.receivedBeat(beat);

    return JSON.parseObject(JSON.toJSONString(peer));
}
JSONObject entry = (JSONObject) object;
String key = entry.getString("key");
final String datumKey;

if (key.startsWith(UtilsAndCommons.RAFT_DOM_PRE)) {
    int index = key.indexOf(UtilsAndCommons.RAFT_DOM_PRE);
    datumKey = UtilsAndCommons.DOMAINS_DATA_ID + key.substring(index + UtilsAndCommons.RAFT_DOM_PRE.length());
} else if (key.startsWith(UtilsAndCommons.RAFT_IPLIST_PRE)) {
    int index = key.indexOf(UtilsAndCommons.RAFT_IPLIST_PRE);
    datumKey = UtilsAndCommons.IPADDRESS_DATA_ID_PRE + key.substring(index + UtilsAndCommons.RAFT_IPLIST_PRE.length());
} else if (key.startsWith(UtilsAndCommons.RAFT_TAG_DOM_PRE)) {
    int index = key.indexOf(UtilsAndCommons.RAFT_TAG_DOM_PRE);
    datumKey = UtilsAndCommons.TAG_DOMAINS_DATA_ID + key.substring(index + UtilsAndCommons.RAFT_TAG_DOM_PRE.length());
} else {
    int index = key.indexOf(UtilsAndCommons.RAFT_TAG_IPLIST_PRE);
    datumKey = UtilsAndCommons.NODE_TAG_IP_PRE + key.substring(index + UtilsAndCommons.RAFT_TAG_IPLIST_PRE.length());
}

long timestamp = entry.getLong("timestamp");

receivedKeysMap.put(datumKey, 1);
if (RaftCore.datums.containsKey(datumKey) && RaftCore.datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
    continue;
}

if (!(RaftCore.datums.containsKey(datumKey) && RaftCore.datums.get(datumKey).timestamp.get() >= timestamp)) {
    batch.add(datumKey);
}

if (batch.size() < 50 && processedCount < beatDatums.size()) {
    continue;
}
if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID) ||
    UtilsAndCommons.INSTANCE_LIST_PERSISTED) {
    RaftStore.write(datum);
}

1.5 MasterElection

1.5.1 选举发送

peers.reset();
local.term.incrementAndGet();
local.voteFor = local.ip;
local.state = RaftPeer.State.CANDIDATE;
public RaftPeer decideLeader(RaftPeer candidate) {
    peers.put(candidate.ip, candidate);

    SortedBag ips = new TreeBag();
    int maxApproveCount = 0;
    String maxApprovePeer = null;
    for (RaftPeer peer : peers.values()) {
        if (StringUtils.isEmpty(peer.voteFor)) {
            continue;
        }

        ips.add(peer.voteFor);
        if (ips.getCount(peer.voteFor) > maxApproveCount) {
            maxApproveCount = ips.getCount(peer.voteFor);
            maxApprovePeer = peer.voteFor;
        }
    }

    if (maxApproveCount >= majorityCount()) {
        RaftPeer peer = peers.get(maxApprovePeer);
        peer.state = RaftPeer.State.LEADER;

        if (!Objects.equals(leader, peer)) {
            leader = peer;
            Loggers.RAFT.info("{} has become the LEADER", leader.ip);
        }
    }

    return leader;
}

1.5.2 选举接收处理

local.voteFor = local.ip;
return local;
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
return local;

二 服务管理

2.1 服务实例注册

服务实例注册.png

2.1.1 regDom()

2.1.1 doAddCluster4Dom()

2.1.1 addIp4Dom()

2.2 服务管理DomainsManager

2.2.1 服务变更处理流程

变更.png

2.2.2 初始化

初始化.png
上一篇下一篇

猜你喜欢

热点阅读