Eureka之Server端注册

2019-04-30  本文已影响0人  0爱上1

上一篇文章Eureka源码之Client端注册分析了Client端注册的整体流程,本文将基于Server端源码,分析Server端是如何处理客户端的注册请求的

注册时序图

注册时序图

源码

ApplicationResource

Jersey是一个轻量级的RESTful 框架,可以进一步地简化 RESTful service 和 client 开发,而ApplicationResource就是基于Jersey框架来处理请求的

// Produces注解规定了请求响应体的MiME类型
@Produces({"application/xml", "application/json"})
public class ApplicationResource {

private final String appName;

// EurekaServer 配置信息
private final EurekaServerConfig serverConfig;

// 负责注册行为的"对等实例注册器"
private final PeerAwareInstanceRegistry registry;
private final ResponseCache responseCache;

ApplicationResource(String appName,
                    EurekaServerConfig serverConfig,
                    PeerAwareInstanceRegistry registry) {
    this.appName = appName.toUpperCase();
    this.serverConfig = serverConfig;
    this.registry = registry;
    this.responseCache = registry.getResponseCache();
}

public String getAppName() {
    return appName;
}

/**
 * Registers information about a particular instance for an Application
 * POST请求:为应用程序注册关于特定实例的信息
 * 
 * @param info 注册实例的信息
 * @param isReplication 在header中的参数,表示是否是一个其他server节点复制的标识
 * 
 */
@POST
// Consumes 注解规定了请求方法接收体的MIME类型
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {

    // 打印入参日志                           
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    
    // 实例信息必要参数非空校验,返回400错误码
    ...
    
    // 获取注册实例信息的数据中心信息,处理客户端可能使用错误的DataCenterInfo注册丢失数据的情况
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }

    // 注册实例,将注册行为委托给了PeerAwareInstanceRegistry 类去完成,也即PeerAwareInstanceRegistryImpl
    registry.register(info, "true".equals(isReplication));
    // 注册成功,返回204状态码
    return Response.status(204).build();
}
}

PeerAwareInstanceRegistryImpl

单例的对等实例注册处理器,负责将注册相关操作(主要有注册,心跳续约,取消,状态改变以及过期)复制到其他对等节点,以使节点之间保持同步

一个Eureka Server只有一个PeerAwareInstanceRegistryImpl实例

PeerAwareInstanceRegistryImpl
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

// volatile 修饰的集群对等节点
protected volatile PeerEurekaNodes peerEurekaNodes;

// 内部定义的枚举类,用于描述eureka注册操作行为
public enum Action {
    Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;

    // 返回该枚举名字的监视类型,用于跟踪某项具体枚举类型操作花费的时间
    private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
    
    public com.netflix.servo.monitor.Timer getTimer() {
        return this.timer;
    }
}

// 内部属性以及其他方法省略
...

/**
 * Registers the information about the {@link InstanceInfo} and replicates
 * this information to all peer eureka nodes. If this is replication event
 * from other replica nodes then it is not replicated.
 * 
 * 调用父类的注册方法完成此次注册操作,同时调用内部私有的replicateToPeers方法去完成server节点间的复制操作
 * 
 * @param info
 *            the {@link InstanceInfo} to be registered and replicated.
 * @param isReplication
 *            true if this is a replication event from other replica nodes,
 *            false otherwise.
 */
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    // 获取Client注册实例的租约持续时间,即server多久收不到该实例的心跳就会将该实例剔除掉,默认90秒
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;

    // 若Client注册实例设置了租约信息且租约持续时间大于0,则替换默认的90秒
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }

    // 调用抽象父类的注册方法,完成注册行为
    super.register(info, leaseDuration, isReplication);

    // 将注册行为同步到集群内的其他节点
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

/**
 * Replicates all eureka actions to peer eureka nodes except for replication
 * traffic to this node.
 * 将eureka操作复制到集群其他节点,除了自己
 *
 */
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    // 启动监视该注册操作类型跑表
    Stopwatch tracer = action.getTimer().start();
    try {
        // 如果是其他节点的复制操作,则增加最近一分钟内节点复制操作的次数
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
    
        // If it is a replication already, do not replicate again as this will create a poison replication
        // 如果是一个来自其他server节点的复制操作,不作再次复制操作,因为方法调用之前会执行注册操作,直接返回
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        // 循环所有对等节点,逐一复制
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            // 若节点是自身,不作复制操作
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 复制实例注册操作到节点
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        // 停止跑表
        tracer.stop();
    }
}

/**
 * Replicates all instance changes to peer eureka nodes except for
 * replication traffic to this node.
 * 根据不同的注册Action,由PeerEurekaNode调用不同的方法实现复制
 *
 */
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);

        // 可以看到这里针对注册行为的不同,做了不同的前置处理,但最终都是将复制操作交由PeerEurekaNode类来完成的
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}
    /**
* 
* 代表了一个集群中节点,用于共享eureka的注册行为,包括
* Register,Renew,Cancel,Expiration and Status Changes等
*
* @author Karthik Ranganathan, Greg Kim
*
*/
public class PeerEurekaNode {

// 用于复制注册操作的HttpClient,发送请求时会自动将isReplication设为true,即表示这是一次节点间复制的请求
private final HttpReplicationClient replicationClient;

/**
 * 
 * 发送复制的注册信息到该类代表的对等节点
 *
 * @param info 实例信息
 * @throws Exception
 */
public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // 这里交给了一个批量任务执行器去执行
    batchingDispatcher.process(
            // 获取执行的任务id = "register" + "#" + appName + instanceId,用于唯一标识一个实例的复制注册行为
            // 该批量任务执行器默认会丢弃掉过期的任务,以及新任务来临会自动丢弃老任务,后台线程异步执行
            taskId("register", info),

            // 任务具体执行的逻辑,就是调用了replicationClient 的注册方法
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

Server端注册的重点来了,抽象类AbstractInstanceRegistry的register(...)注册方法

/**
* Handles all registry requests from eureka clients.
* 负责处理所有来自eureka 客户点的注册请求
*
* <p>
* Primary operations that are performed are the
* <em>Registers</em>, <em>Renewals</em>, <em>Cancels</em>, <em>Expirations</em>, and <em>Status Changes</em>. The
* registry also stores only the delta operations
* </p>
* 
* 执行的主要操作有,注册,续约,取消注册,过期和状态变化,注册表只会存储delta操作
*
* @author Karthik Ranganathan
*
*/
public abstract class AbstractInstanceRegistry implements InstanceRegistry {

// 用于保存客户端注册信息的双层Map结构
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

// 定义读写锁,用在注册相关操作上
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

// 读锁,为共享锁
private final Lock read = readWriteLock.readLock();

// 写锁,排他锁
private final Lock write = readWriteLock.writeLock();
protected final Object lock = new Object();

// 持有eureka server配置信息以及eureka client配置信息
protected final EurekaServerConfig serverConfig;
protected final EurekaClientConfig clientConfig;

// 负责缓存客户端注册信息的类,以供eureka client 查询
protected volatile ResponseCache responseCache;

/**
 * Registers a new instance with a given duration.
 * 根据Client指定的持续时间,注册一个实例
 * 
 */
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // 1. 获取读锁
        // TODO 这里留下疑问,为什么注册操作会使用读锁?
        read.lock();

        // 2. 根据此次注册client的appName,从注册表获取是否有已注册信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);

        // 3. 如果没有已注册信息,表示首次注册
        if (gMap == null) {
            // 3.1 new一个该appName对应的注册信息Map
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            
            // 3.2 使用putIfAbsent向registry中新增元素
            // 只有当registry中不存在指定的key时,才会执行put元素操作,否则不执行put操作,另外一点是无论有没有执行put操作,都会返回preview 元素
            // 即当不存在指定key,就put元素,返回null,若存在key,则不执行put操作,返回已存在的value值
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);

            // 3.3 如果返回null,则表示当前线程put成功,即registry中保存的value为当前线程new的值,即gMap = gNewMap
            if (gMap == null) {
                gMap = gNewMap;
            }
            // 3.4 如果返回不是null,则表示可能有其他线程先put了,那3.2 中的gMap变量就是其他线程先put后的value
        }

        // 4. 根据注册客户端的InstanceId,获取gMap中该客户端的注册租约信息Lease
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());

        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        // 5. 已存在注册租约信息,保留最后dirty timestamp不去覆盖
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // The lease does not exist and hence it is a new registration
            // 注册租约信息不存在,表示这是一个新的注册,此处开始同步
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }

        // 6. 不管之前的租约信息存在还是不存在,都会new一个新的租约注册信息,如果之前的租约信息存在,就更新新租约的服务启动时间为已存在租约的服务启动时间
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }

        // 7. 将新租约的信息put进gMap中
        gMap.put(registrant.getId(), lease);

        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        
        // 8. 新增最近改变的租约队列
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();

        // 9. 失效缓存信息
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        // 10. 释放读锁
        read.unlock();
    }
}

总结一下register()做的事情就是

  1. 将client的注册信息封装成Lease<InstanceInfo> 租约,存入双层map结构中,不管存在不存在该Client的Lease信息,都会重新new一个新的Lease对象,put进内层Map中

  2. 更新recentRegisteredQueue,recentlyChangedQueue

  3. 失效responseCache缓存

上一篇下一篇

猜你喜欢

热点阅读