分布式相关学习

Eureka Server 源码分析

2018-04-06  本文已影响32人  莫那一鲁道

上一篇文章中,我们讨论了 Client,关于 Client 的注册,心跳(续约),远程调用等,我们都对源码做了分析,今天就对 Server 的源码做一个分析。

0. Server 有哪些功能

  1. 提供服务注册功能。
  2. 消费者可以获取服务列表。
  3. 服务可以续约。
  4. Server 集群之间的数据共享。

1. 提供服务注册功能

我们使用 EurekaServer 的时候,需要在启动类上加入 @EnableEurekaServer注解,这个注解肯定就是我们研究的入口。

该注解注释说道:

Annotation to activate Eureka Server related configuration {@link EurekaServerAutoConfiguration}
通过 EurekaServerAutoConfiguration 激活 Eureka 相关的配置。

进入该类查看,该类可以说是一个 config 配置类,不是能很直观的找到入口。

我们可以通过日志找到入口,怎么找呢?当 Client 启动的时候,就会尝试向注册中心注册,而注册中心在注册成功之后,就会打印日志。通过日志,我们搜索到注册事件发生在 ApplicationResourceaddInstance 方法中。而 Client 的请求 requestUri则是 POST http://localhost:8761/eureka/apps/SERVICE-HI HTTP/1.1(我的测试 demo)。

这个ApplicationResource 类的注释说这个类是用于 处理特定请求相关的资源。该类主要的方法就是addInstance方法。

关键代码如下:

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    // validate that the instanceinfo contains all the necessary required fields
    // 校验数据是否正确。。。。

    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    // 处理客户端数据丢失的情况。。。。

    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

参数的 InstanceInfo 对象就是服务实例的具体信息,比如 ip, 实例名称,端口,更新时间,心跳 url,控制页面的 url 等。isReplication参数作用则是判断这是否是集群之间的复制。防止重复注册。

最后调用register.register 方法并返回 204.

通过跟踪代码,发现register的关键代码在PeerAwareInstanceRegistryImpl 类的 register 方法中。该代码注释写道:注册这个节点信息,并传播给所有其他同伴节点。但如果这是一个复制事件的话,则不会进行传播。

代码如下:

public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
} 

代码中,可以看到默认的续约时间是 90 秒,然后调用父类的register方法,最后尝试复制到其他节点(需要判断isReplication 字段)。

抽象父类 AbstractInstanceRegistryregister 方法很长,简单说说逻辑。

这个类将所有的实例保存在一个双重的map 中(ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>),也就是每个App 对应一个 MapMap 中存储着多个实例。

我们回到replicateToPeers 方法中,如果不是复制事件,EurekaServer会将该实例传播到自己的所有同伴中去,通常这些同伴就是配置文件中写入的。反之,如果是复制事件,则直接 return,也就是说,Eureka的复制事件是不会有蝴蝶效应的,这有助于控制事件的传播范围。

而这些节点也是需要定时更新的。默认 10 分钟更新一次。通过调用 PeerEurekaNodesupdatePeerEurekaNodes 方法进行更新。

到这里,关于 EurekaServer 服务注册的功能就基本差不多了。

2. 消费者可以获取服务列表

从前面的注册过程中,我们知道,Client请求的路径是 /apps,我们使用 IDEA 的 Java Enterprise 功能,查看该路径下的方法:

image.png

根据上图中的箭头,便可找到这个对应URL 的方法 ApplicationsResource#getContainers(),当然你也可以使用全局搜索。

该方法会从缓存中取出所有的实例,返回给客户端。

image.png

这个缓存的实现是一个 ResponseCacheImpl 类,内部使用 Map 保存实例,而不是像 ZK 使用树形结构保存。该类在构造的时候,就会创建一个定时任务,任务内容则是执行 getCacheUpdateTask 方法,用于更新缓存,默认 30 秒更新一次。

3. 服务可以续约

续约其实就是心跳,客户端需要向Server发送心跳证明自己没有发生故障。而客户端执行此任务的就是DiscoveryClient 的 内部类HeartbeatThread ,从名字上就可以看出,这是一个心跳任务,内部 run方法执行的就是外部类的 renew 方法。

image.png

我们层层追踪,在 JerseyApplicationClientsendHeartBeat 方法中,看到心跳的路径是 http://localhost:8761/eureka/apps/SERVICE-FEIGN/localhost:service-feign:8765?status=UP&lastDirtyTimestamp=1522984522100,同时,这个请求也是个 PUT 请求。那么在 Server 端接收的方法是什么呢?什么方法参数对应这个呢?用 Java Enterprise 没有搜到,然后通过全局搜索:

完美!就是这里

InstanceResource类的 renewLease 方法,该方法注释如下:A put request for renewing lease from a client instance。

该方法有几个参数,其中的isReplication参数需要注意,其实和前面的 isReplication一样,不论是注册还是更新,都会在集群之间尽心复制,而复制的手段就是调用相同的接口,通过isReplication 参数进去区分。如果是复制的话,就不再继续进行传播了。

最后返回一个 Response.ok().build() ResponseImpl对象.

4. Server 集群之间的数据共享

我相信通过前面代码的阅读,大家应该对 EurekaServer 集群之间的复制都有了一点印象了,那我们再 继续看看这一块的逻辑。

当有注册事件或者心跳事件时,都会对集群中的同伴节点进行传播。

注册事件的主要方法是 PeerAwareInstanceRegistryImpl 类的 register 方法,注册成功之后会执行 replicateToPeers 方法,这个就是 EurekaServer 之间的数据共享了。其中 isReplication 决定是否复制到别的节点。如果是别的 Server 的复制请求的话,则停止复制的蔓延。代码如下:

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    // 复制
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

而续约事件在 PeerAwareInstanceRegistryImpl 类的 renew 方法中也存在调用共享数据的行为。代码如下:

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
        // 复制
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

同时,集群中的节点也是会定时更新的,PeerEurekaNodes 的 start 方法会创建一个定时任务,默认的时间是 10 分钟,代码如下:

public void start() {
    ....
    updatePeerEurekaNodes(resolvePeerUrls());
    Runnable peersUpdateTask = new Runnable() {
        @Override
        public void run() {
            try {
                updatePeerEurekaNodes(resolvePeerUrls());
            } catch (Throwable e) {
                logger.error("Cannot update the replica Nodes", e);
            }

        }
    };
    taskExecutor.scheduleWithFixedDelay(
            peersUpdateTask,
            serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
            serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
            TimeUnit.MILLISECONDS
    );
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  " + node.getServiceUrl());
    }
}

总结

关于 EurekaServer 的主要 4 个功能就大致这么多,总结下。

  1. Server 给 Client 提供注册功能,所有实例保存在一个双重 Map 中,注册后,Server 会将数据复制到配置的同伴节点。

  2. Client 从 Server 获取服务列表的是通过 Http 请求 Server 的 ApplicationsResource#getContainers() 方法获取所有的服务列表,而这个服务列表的更新时间是 30 秒,因此会有一些延迟。

  3. 服务的续约其实就是心跳,每个 RPC 服务都是需要心跳的。Client 的心跳实现是 HeartbeatThread ,会定时的调用 renew 方法,这个方法会调用 Server 的 InstanceResource#renewLease 方法,同时,这个行为也会复制到配置文件中的其他 Server 节点。

  4. Server 为了高可用,可以做成集群,那么,也就是说,需要进行数据的备份,ZK 始终使用的是一个节点,为了保证 CAP 的 CP,而 Eureka 的设计理念和 ZK 不同,Eureka 认为一致性在注册中心这种服务上优先级不是最高的。于是 Eureka 选择了 AP,保证高可用和容错。在每一次注册事件和续约事件都会进行集群之间的复制,复制过程就是和普通的 Server 调用系统,只不过加入了一个区别参数 isReplication,当时复制事件的时候,就不能继续复制到其他的节点了,这是 Eureka 的设计。

好,能力不高,水平有限,就到这里。good luck!!!!

上一篇 下一篇

猜你喜欢

热点阅读