Spring Cloud Spring CloudSpring Cloud 核心技术

Eureka Server 源码解析 (v1.7.2)

2019-09-30  本文已影响0人  moneyoverf1ow

本文也我是边看边写的,如果有哪里说的不对请告知.
源码注意看注释.

Server集群

Eureka Server 集群节点被抽象成 PeerEurekaNode, 从名字可以看出他们的身份是对等的, 没有类似主从的概念. 集群间的数据同步是近实时的, 由节点自身负责. 因此应用服务集群规模较大时, 同步的压力也是非常大的.
PeerEurekaNode 封装了一些集群间同步的行为, 包括客户端的注册, 取消, 心跳等等(见下图1-1).

1-1

当某个客户端发送了注册,取消或者心跳请求到某个eureka server上时, 该节点会同步客户端的行为到集群中其他节点, 并且通过一个任务执行器异步地并且(大多数)批量地完成这些任务(batchingDispatcher). 所以, 集群间的数据同步是增量的.看一下注册的代码(PeerAwareInstanceRegistryImpl#register).

//注册
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    //注册
    super.register(info, leaseDuration, isReplication);
    //集群同步
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

举个例子, 假设集群中节点A和节点B之间的数据不一致, 有应用X第一次注册到了A上, A会向B注册X, 这样A和B就一致了. 如果是X向A发送心跳, A会向B同步该心跳, 如果此时B中没有X, A会向B发起X的注册. 其他行为也都类似.

跟进看下同步的代码

public void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

根据action用PeerEurekaNode的不同方法, 还是看注册

public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        //任务丢进分发器
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

这里是把注册封装成了一个任务丢给了batchingDispatcher. 这是一个任务分发器. 看PeerEurekaNode的构造函数找到这个东西的初始化方法--TaskDispatchers#createBatchingTaskDispatcher, 跟进去看一下.

public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                             int maxBufferSize,
                                                                             int workloadSize,
                                                                             int workerCount,
                                                                             long maxBatchingDelay,
                                                                             long congestionRetryDelayMs,
                                                                             long networkFailureRetryMs,
                                                                             TaskProcessor<T> taskProcessor) {
        //任务接收器
        final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs);
        //任务调度器,与acceptorExecutor配合使用
        final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
        //任务分发
        return new TaskDispatcher<ID, T>() {
            @Override
            public void process(ID id, T task, long expiryTime) {
                //任务丢进acceptorExecutor的接收队列
                acceptorExecutor.process(id, task, expiryTime);
            }

            @Override
            public void shutdown() {
                //停止acceptor线程
                acceptorExecutor.shutdown();
                //停止work线程
                taskExecutor.shutdown();
            }
        };
    }

主要是这两个东西--AcceptorExecutorTaskExecutors.
AcceptorExecutor有一个接收线程接收客户端的任务, 然后分发给工作线程(TaskExecutors提供)处理.

下面这段比较具体和细节, 感兴趣的可以看一下, 对整体理解没什么作用, 但是能学习到一些技术方面的东西.

先看一下AcceptorExecutor的几个关键属性:

看下AcceptorExecutor.AcceptorRunnerrun方法

        @Override
        public void run() {
            long scheduleTime = 0;
            while (!isShutdown.get()) {
                try {
                    //从接收队列和重试队列中取出所有任务到待处理集合中
                    drainInputQueues();
                    int totalItems = processingOrder.size();
                    long now = System.currentTimeMillis();
                    if (scheduleTime < now) {
                        scheduleTime = now + trafficShaper.transmissionDelay();
                    }
                    if (scheduleTime <= now) {
                        //按需将pendingTasks中的任务丢进2个工作队列
                        assignBatchWork();
                        assignSingleItemWork();
                    }
                    // If no worker is requesting data or there is a delay injected by the traffic shaper,
                    // sleep for some time to avoid tight loop.
                    if (totalItems == processingOrder.size()) {
                        Thread.sleep(10);
                    }
                } catch (InterruptedException ex) {
                    // Ignore
                } catch (Throwable e) {
                    // Safe-guard, so we never exit this loop in an uncontrolled way.
                    logger.warn("Discovery AcceptorThread error", e);
                }
            }
        }

跟进drainInputQueues()

private void drainInputQueues() throws InterruptedException {
            do {
                //将重试队列和接收队列清空,其中的任务丢进待处理的任务集合pendingTasks
                drainReprocessQueue();
                drainAcceptorQueue();
                if (!isShutdown.get()) {
                    //队列为空,阻塞一小段时间.这么做是为了尽可能达成退出循环条件,避免tight loop
                    if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                        TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                        if (taskHolder != null) {
                            appendTaskHolder(taskHolder);
                        }
                    }
                }
            } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());//循环直到取出全部任务
        }

跟进assignBatchWork()

void assignBatchWork() {
            //是否有需要执行的任务 
            //1.处理序列空,则不执行
            //2.待处理任务数量达到最大值则立即执行
            //3.超过任务执行的延迟则立即执行
            if (hasEnoughTasksForNextBatch()) {
                //获取信号量.该信号量由消费者线程释放.实现了按需分配.
                if (batchWorkRequests.tryAcquire(1)) {
                    long now = System.currentTimeMillis();
                    int len = Math.min(maxBatchingSize, processingOrder.size());
                    //小细节,避免数组扩容
                    List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                    while (holders.size() < len && !processingOrder.isEmpty()) {
                        ID id = processingOrder.poll();
                        TaskHolder<ID, T> holder = pendingTasks.remove(id);
                        if (holder.getExpiryTime() > now) {
                            //未过期
                            holders.add(holder);
                        } else {
                            expiredTasks++;
                        }
                    }
                    if (holders.isEmpty()) {
                        //没有取到任务,不会占用信号量
                        batchWorkRequests.release();
                    } else {
                        batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                        //添加到批处理工作队列
                        batchWorkQueue.add(holders);
                    }
                }
            }
        }

然后TaskExecutors会有一批工作线程不停地从AcceptorExecutor的工作队列中取出任务进行处理(就是调一下batch接口: com.netflix.eureka.resources.PeerReplicationResource#batchReplication).
看一下TaskExecutors的工作线程做了什么事情, 看TaskExecutors.BatchWorkRunable

@Override
public void run() {
    try {
        while (!isShutdown.get()) {
            //从AcceptorExecutors的工作队列中取出任务.
            //释放一个信号量,然后循环取出队列中的所有任务.
            List<TaskHolder<ID, T>> holders = getWork();
            metrics.registerExpiryTimes(holders);
            List<T> tasks = getTasksOf(holders);
            //调其他节点的batch接口
            ProcessingResult result = processor.process(tasks);
            switch (result) {
                case Success:
                    break;
                
                //返回503,节点繁忙,稍后重试
                case Congestion:
                //网络异常,稍后重试
                case TransientError:
                    //丢进重试队列
                    taskDispatcher.reprocess(holders, result);
                    break;
                
                //其他非网络异常,不会重试
                case PermanentError:
                    logger.warn("Discarding {} tasks of {} due to permanent error",                         holders.size(), workerName);
            }
            metrics.registerTaskResult(result, tasks.size());
        }
    } catch (InterruptedException e) {
        // Ignore
    } catch (Throwable e) {
        // Safe-guard, so we never exit this loop in an uncontrolled way.
        logger.warn("Discovery WorkerThread error", e);
    }
}

集群节点间的协作差不多就到这里了.下面看一下数据存储.

数据存储

Eureka的数据是存在内存中的.注册中心抽象成 AbstractInstanceRegistry.应用实例的数据存在registry变量中, 类型是ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>.它的key是appName, 内层的key是instanceId.通过这两个key可以唯一确定一个应用实例的租约, 查询起来效率也非常高.value是Lease<InstanceInfo>.Lease是一个很关键的概念,后面会分析这个东西的意义.

先来看一下注册的代码,乍一看有点多,莫慌,硬看.

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            //初始化
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    //获取引用
                    gMap = gNewMap;
                }
            }
            //先看有没有已经存在该应用的租约
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            //如果存在
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                //已经存在的版本更新 
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    //这里省略了日志代码
                    //以本地的instanceInfo为准
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        //每分钟的续约期望数.因为是新注册了一个客户端,所以加2(30s1次,1min2次)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        //每分钟的续约数量阈值,乘了一个百分比系数
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
            }
            //封装成一个实例的租约
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            //存入registry数据结构中
            gMap.put(registrant.getId(), lease);
            //统计和debug用,可以忽略
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            //外界操作的覆盖状态,比如将某个服务手动上下线等等.该值会被缓存,即时客户端重新注册,也可以从缓存中取出.
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            //用overriddenStatus覆盖status
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            //更新缓存
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

实例的租约是由一个定时任务和客户端的续约行为来维护的, 客户端的续约也会在集群内同步, 保持该实例不过期, 始终处于激活状态. 如果租约到期, 客户端由于某些原因没有进行续约, 那么该任务会将过期实例下线.参考EvictionTaskAbstractInstanceRegistry#evict的源码, 这里不赘述.
另外还有一个变量需要关注, 就是ResponseCache. 这个缓存在eureka开放的restful接口中都有用到, 顾名思义, 是接口返回值的缓存. Eureka接口的返回格式有json和xml, 并且有些接口需要返回的数据量庞大, 需要压缩, 因此有了这样一层缓存, 可以省去一些序列化和压缩以及大数据量查询带来的性能损耗.

Restful Api

从这些api也可以推理出一些客户端与服务端以及服务端与服务端之间的交互逻辑,从而能够知道客户端大概长什么样子.代码在com.netflix.eureka.resources下.它用的是jersey框架.
总结一下常用的一些api.

随便找两个接口感受一下
看一下续约接口和增量接口(客户端常用的接口).
先看续约

    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        //请求是否来自集群其他节点
        boolean isFromReplicaNode = "true".equals(isReplication);
        //向注册中心续约是否成功
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) { 
            //续约失败,表示注册中心中没有这个实例
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            //这里可以跟进去看一下
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }

InstanceInfo有一个概念叫dirty time stamp. 在InstanceInfo中是成员变量lastDirtyTimestamp,这个概念非常重要,是最近一次更新的时间戳,可以理解为一个版本号一样的东西.跟进this.validateDirtyTimestamp看一下

    private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                            boolean isReplication) {
        InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
        if (appInfo != null) {
            if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
                Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
                //如果客户端续约的时候传过来的lastDirtyTimestamp比当前的注册中心中的更新, 
                //那么表示当前注册中心中的租约是过时的,应该有新的租约注册进来, 所以返回404
                //因此当前情况下表示注册中心的租约是老的,也就是注册中心中的instanceInfo是落后于客户端的
                if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                    logger.debug(
                            "Time to sync, since the last dirty timestamp differs -"
                                    + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                            args);
                    return Response.status(Status.NOT_FOUND).build();
                } 
                //如果注册中心的instanceInfo比客户端的新
                else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                    //如果是集群间的复制,那么把当前的instanceInfo返回,以便发起复制的节点同步最新数据
                    //这段逻辑需要关联PeerEurekaNode#heartbeat方法的replicationTask的handleFailure方法理解
                    if (isReplication) {
                        logger.debug(
                                "Time to sync, since the last dirty timestamp differs -"
                                        + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                                args);
                        return Response.status(Status.CONFLICT).entity(appInfo).build();
                    } else {
                        //如果是客户端的,依然允许续约
                        return Response.ok().build();
                    }
                }
            }
        }
        return Response.ok().build();
    }

再看看增量接口, 客户端依赖这个接口维护本地的服务列表.
我们可以学习这种思想,大数据量同步的时候使用增量同步,可以减少占用带宽和cpu压力.

//此处省略接口源码,因为T*D都是从缓存中拿的,感兴趣的看下responseCache

End, 如果有哪里写得不对,希望联系一下我.
觉得有帮助的希望点个赞支持一下, 又不要钱= =.

上一篇下一篇

猜你喜欢

热点阅读