Eureka源码--Eureka Server
Eureka是什么?
eureka是Netflix的子模块之一,也是一个核心的模块,eureka里有2个组件,一个是EurekaServer(一个独立的项目) 这个是用于定位服务以实现中间层服务器的负载平衡和故障转移,另一个便是EurekaClient(我们的微服务) 它是用于与Server交互的,可以使得交互变得非常简单:只需要通过服务标识符即可拿到服务。
与spring-cloud的关系:
Spring Cloud 封装了 Netflix 公司开发的 Eureka 模块来实现服务注册和发现(可以对比Zookeeper)。Eureka 采用了 C-S 的设计架构。Eureka Server 作为服务注册功能的服务器,它是服务注册中心。
而系统中的其他微服务,使用 Eureka 的客户端连接到 Eureka Server并维持心跳连接。这样系统的维护人员就可以通过 Eureka Server 来监控系统中各个微服务是否正常运行。SpringCloud 的一些其他模块(比如Zuul)就可以通过 Eureka Server 来发现系统中的其他微服务,并执行相关的逻辑。
@EnableEurekaServer注解
我们在使用Eureka注册中心时,需要在spring-boot启动项目上加注解:@EnableEurekaServer,那么这个注解的作用是什么呢?我们通过源码来分析。
注解@EnableEurekaServer的作用其实很简单,就是通过@Import注解将EurekaServerMarkerConfiguration注入到Spring容器当中去。下面我们看下这个类干了什么。
可以看到,它是一个配置类,在其中又注入了一个叫Marker的bean,这个bean没有任何的属性和方法,通过名字也能知道,它只是起一个标记的作用,通过该bean做标记,就能做到动态插拔的效果。那它是怎么实现动态插拔的呢?
在Spring boot项目中,如果有某些配置类需要被Spring boot自动装载,则可以通过声明一个配置文件的形式,将这些配置类声明在配置文件中,Spring boot会自动扫描这个配置文件重的配置类。
Eureka就是利用了Spring boot的这一特性,在eureka-server的源码包中我们看到,在META-INF包下有一个spring.factories的配置文件,这个文件名是spring写死的不能变,在配置文件中有一个配置EurekaServerAutoConfiguration,eureka的自动配置就是通过这个类实现的,Spring boot会把这个配置类加载到Spring的容器当中去,并且让这个配置类生效,下面我们进入到这个类中
EurekaServerInitializerConfiguration用来初始化Eureka的上下文,可以看到有一个ConditionalOnBean的注解,这个注解会判断,如果当前Spring容器中有EurekaServerMarkerConfiguration.Marker这个bean,则会把EurekaServerAutoConfiguration这个类注入到Spring容器中;反之则不会注入,eureka就不会工作。
Eureka作为一个注册中心,需要实现网络之间的通信,底层是使用一个叫Jersey的框架实现的,其作用类似于Spring MVC,但是它是通过过滤器实现的。
JerseyFilter注入
Eureka 核心原理
eureka的核心代码是在eureka-core的jar包中,入口是在resources的包中,ApplicationResource的addInstance方法就是用来进行服务注册的,该方法的核心代码是registry.register这一行,这个方法就是用来进行服务注册的。register这个方法的具体实现是一个叫InstanceRegistry的类来实现的,我们查看下这个类的继承关系图
通过查看代码可以发现,这是一种典型的责任链模式,AbstractInstanceRegistry只用来进行服务注册,PeerAwareInstanceRegistryImpl用来做集群信息同步,InstanceRegistry用来发布监听。InstanceRegistry是Spring cloud对于eureka注册功能的扩展。
一个简单的用户微服务集群如下图:servers-user对应源码中的微服务名appName,user-1对应的具体的微服务id,即serviceId。
在注册方法的第一行,有一个读锁,读锁并不会阻塞;gMap是通过微服务名拿到的微服务组信息,如果还没有别的微服务注册进来,则新建一个ConcurrentHashMap;existingLease已经存在的微服务实例对象,首次注册时,这个对象为null。
在debug调试时, gMap和existingLease可能会有值,这是因为eureka client的重试机制导致的,在一定时间内,客户端会再次发送注册请求。正常情况下会进入到这个else中,即不存在微服务冲突时,更新一下自我保护阈值更新,最后put到gMap中,即完成了服务注册。
什么时候会进入到if中呢,一般90%的可能是不会进入到if的逻辑块中的,如果我们的一个微服务组中某两个或者多个节点,指定的唯一id重复,则会进入到if中,if里面也是eureka处理冲突的逻辑。如果存在冲突,获取了一个existingLastDirtyTimestamp,表示当前存在的微服务实例对象的最后操作的时间戳,然后和传过来的注册实例的时间戳registrationLastDirtyTimestamp比较;getHolder()具体的微服务实例对象;如果当前存在的微服务对象的时间戳大于传过来的微服务实例的时间戳,则覆盖传过来的微服务实例对象;反之,则不做处理,使用的是从参数传递过来的微服务实例对象。即:哪个微服务的时间戳比较新,则用哪个,如果两个时间戳的值相等,使用传递来的微服务实例对象。无论是否有冲突,都会新new一个Lease租债器对象,最后执行注册操作。
Lease租债器对象
枚举类Action表示这个对象的具体操作:注册、剔除、续约。
T holder,这个租债器对象是为了一系列操作而实现的,可以看成一个工具类,所以这个holder的类型是一个泛型,可以是任意对象类型,比如它可以是一个字符串,给这个字符串定义过期时间;在eureka服务注册时,它表示微服务的实例对象。
evictionTimestamp:服务被剔除的时间;registrationTimestamp:服务注册时间;serviceUpTimestamp:恢复正常工作状态的时间;lastUpdateTimestamp:最后操作时间。
心跳检测源码
各个微服务会定时发送请求,给注册中心,以确认服务还在正常工作,这一操作称之为心跳续约。注册中心会保存微服务的最后一次操作时间(注册、心跳续约、服务剔除/下架等都算操作),心跳续约的原理就是把最后一次的操作时间 等于 当前系统时间+心跳续约时长。
上图是InstanceRegistry类中续约代码,这个renew的实现其实很简单,就是根据传进来的微服务名和具体的微服务id到已经注册了的微服务列表集合中查找,经过双层循环遍历,找到具体的微服务实例,同时发布一个续约的监听事件,告诉监听者,哪个微服务要进行续约了,这是一个扩展点(如果有需求,需要监听心跳续约,可以监听EurekaInstanceRenewedEvent事件),续约的实际操作还是在前文提到过的调用链的AbstractInstanceRegistry中执行真是的续约操作。
首先通过微服务名获取到微服务组gMap,然后通过具体的微服务id找到微服务实例对象leaseToRenew,如果leaseToRenew等于null,返回一个false,微服务客户端会返回一个404,这时候客户端就会尝试把心跳续约改为服务注册操作,这里在后面看客户端源码的时候会进行详细分析。一般情况下是不会进入到这个if块中的,一般是走else,else中的绝大部分代码只是用来判断一下状态,这个方法的最后一行leaseToRenew.renew();是真正进行续约的操作,具体实现又到了Lease中,续约逻辑其实非常简单,就是把当前系统时间加上配置的心跳续约的时长赋值给lastUpdateTimestamp最后操作时间这个属性。
服务下架/剔除源码
服务剔除和服务下架不是同一个操作,区别就是:服务下架是客户端自己发送一个clear请求,退出这个集群;服务剔除是服务端发现一个客户端很长时间没有发送心跳,则服务端会尝试主动剔除掉这个微服务。一个是主动,一个是被动,主动和被动是一个相对的关系。
服务下架/剔除它的实现是在EurekaServerInitializerConfiguration这个类中,这个类的主要作用有:初始化eureka的配置、初始化eureka context(集群同步注册信息、启动定时器(服务剔除、自我保护监听)、初始化自我保护机制的阈值)等等。在这个类中,一个非常重要的方法就是start,我们可以看到,方法内部开启了一个线程,这是为什么呢?这样做的好处是,当初始化eureka server中途出了任何问题的时候,只是在当前线程中出异常,并不会影响主线程的运行。
进入到contextInitialized()-->initEurekaServerContext()这个方法,在这个方法中集群同步注册信息、初始化自我保护机制的阈值更新等都是在这个方法中设置的。
this.registry.syncUp()这个就是用来进行集群同步的,eureka server加入到集群中的时候,要同步集群中的注册信息等内容,就是在syncUp这个方法中实现的,同步集群内的信息。
serverConfig.getRegistrySyncRetries()拿到yaml中配置的重试次数进行循环,serverConfig.getRegistrySyncRetryWaitMs()根据配置的休眠时间休眠,这个时间是重试的等待时间。eurekaClient.getApplications()拿到所有的注册信息等等内容,然后遍历,调用本地的注册信息,可以看到最后一个参数isReplication的值为true,说明是来自集群同步的信息,不再同步给集群中其他节点。
this.expectedNumberOfClientsSendingRenews = count,传进来的count就是syncUp统计的注册的客户端数量,即expectedNumberOfClientsSendingRenews 预计需要接收到客户端心跳链接的数量。在这个方法的最后一行super.postInit(),这才是我们这一章节的主题所在,在这个方法里面启动了好多任务,就包括了服务剔除/下架的定时任务。
Eureka启动了多个定时器,定时去判断,各个微服务是否过期,定时清除因为长时间没有发送心跳的客户端。这个定时的时长,可以在yaml中配置,用eureka.server.eviction-interval-time-in-ms指定时长;长时间就是心跳续约的过期时间,可以用eureka.instance.leaseExpirationDurationInSeconds指定心跳续约时长。服务剔除不会进行集群同步,因为所有的节点都会进行剔除;服务下架会同步。
集群同步
所谓集群同步,就是各个Eureka Server复制相互的注册信息,同步各自的操作,集群同步时server之间的操作。它的实现原理是,发送一个同样的http请求给其他Server。比如:从eureka client发送了一个注册请求给微服务集群中的某一台server,这个server会发送一个同样的注册请求给集群中的其他server。
replicateToPeers方法中isReplication这个参数非常重要,由于eureka并没有单独实现集群同步的方法,无论是客户端注册还是集群同步执行的都是同样的代码,所以就用isReplication这个参数做区分,如果它为true,则表示server之间进行集群同步操作,如果为fasle则表示,eureka client客户端进行普通的注册、续约等操作。
我们分析下这个方法,如果isReplication为true,则表明是在进行集群同步操作,第一个if只是做一个计数的操作;第二个if是一个非常值得推敲的判断,代码的逻辑很简单,如果peerEurekaNodes为空(即只有一个微服务,或者在配置文件中没有配置集群信息)或者isReplication为true(即正在进行集群同步),则直接返回。为什么如果在进行集群同步它就返回了呢?这样做的好处是能够避免各个eureka server重复注册。比如,server1发给server2,server2注册完之后,不会再发给server3进行注册了。
到此为止就分析完了集群同步的逻辑。
自我保护机制源码
numberOfRenewsPerMinThreshold就是触发自我保护机制的阈值,它等于expectedNumberOfClientsSendingRenews预估值乘以心跳连接的刷新时间,因为自我保护阈值它是以分钟为单位的,所以会有一个60除以心跳连接的续约时间,再乘以自我保护机制的阈值的触发边界(默认85%),即自我保护机制阈值=预估心跳值(所有注册上来的实例)*每分钟触发的心跳连接次数(60s/服务端每分钟心跳连接刷新时间(默认30s))*自我保护机制触发百分比(默认85%)。
自我保护机制阈值的更改条件:服务初始化、15分钟自动更改、注册、服务下架的这几种情况下会更改。
服务发现(缓存源码)
在拿到所有的注册信息后,response会进行压缩,使用GZIP的方式进行压缩,可以关掉该压缩机制。而responseCache就是它的缓存,进入到getGZIP中
进入到generatePayload方法中,在这个方法中会判断你是全量获取还是增量获取,这就是缓存得读取。总结:首先进入只读缓存,如果只读缓存没有数据,则进入读写缓存;读写缓存注册了监听器,如果读写缓存也没有,就执行监听器的逻辑,从真是数据里面拿。
缓存更新
读写缓存的更新:只读缓存的数据,只会来源于读写缓存,而且没有提供主动更新的API,只读缓存只能通过定时器去定时更新,定时器每隔30秒执行一次;如果只读缓存中没有但是读写缓存中有数据,则也会更新。
getCacheUpdateTask的run方法的逻辑很简单,遍历只读缓存readOnlyCacheMap的key集合,然后用只读缓存的key去读写缓存readWriteCacheMap中获取,如果该key从读写缓存中获取的值和只读缓存的不同,则更新只读缓存。eureka使用了读写锁做数据控制,当做服务发现的时候加的是写锁,服务注册、续约更新、下架/剔除加的是读锁。
服务延迟
监听服务下架有延迟,默认情况下,90秒客户端没有发送心跳,eureka会剔除微服务,这个剔除会有一个延迟,当客户端监听到服务端剔除了哪些微服务,需要等待90秒,加上服务剔除定时器默认的60秒的延迟,再加上客户端默认的延迟30秒,加上ribbon的延迟60秒,总共有4分多钟的延迟。