深入理解Eureka获取注册信息(七)
Eureka-Client获取信息
启动获取
在客户端应用启动时,初始化DiscoverClient的时候,会主动去获取一次注册信息
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ...省略N多代码
// 如果fetch-registry = true , 则去Eureka Server拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
// 如果所有的Eureka Server都不可用,那么从备用的服务里面去取数据
fetchRegistryFromBackup();
}
// ...省略N多代码
// 设置定时器
initScheduledTasks();
}
shouldFetchRegistry : 默认true
fetchRegistry : 获取注册信息,此处传入的是false, 表面上看是不需要全量获取,但是应用第一次启动的时候,
本地缓存为空,所以还是会全量获取的。
PS: 启动时获取注册信息为全量。
定时器获取
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// ...省略N多代码
}
registryFetchIntervalSeconds : 默认值为30秒 ,每30秒刷新一次、
定时器初始化,,直接看CacheRefreshThread()
class CacheRefreshThread implements Runnable {
public void run() {
// 刷新注册信息
refreshRegistry();
}
}
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// 判断是否需要全量获取 , remoteRegionsModified 这个值来决定
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
// 获取注册信息
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
// 日志输出 , 省略。。
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
由上可以看到,系统在启动的时候,初始化了一个定时器,每30秒一次,用来刷新本地缓存信息。
获取注册信息
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 获取本地的缓存信息 , 也就是客户端注册信息
Applications applications = getApplications();
// 判断是否需要全量获取
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
// 增量获取
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 发布缓存刷新事件。
onCacheRefreshed();
// 更新本地应用的状态
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
clientConfig.shouldDisableDelta() : 是否禁用增量获取, 默认为false , 如果禁用了的话,那就只能是全量获取了,总要获取一下不是。
clientConfig.getRegistryRefreshSingleVipAddress() : 当这个属性不为空的时候,则全量获取。具体作用不是很清楚(苦笑)
forceFullRegistryFetch : 传入的参数,表示是否需要全量获取
applications : 本地注册信息的缓存,如果本地缓存为空,或者里面的版本号为-1,那么就需要全量获取,表示首次加载时。
onCacheRefreshed() : 发布缓存刷新的事件,用户可以自定义是否监听这个事件,比如需要将注册信息的变化落库。
全量获取
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 发送HTTP请求,去服务端获取注册信息
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 设置到本地缓存里面去
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
在发送HTTP请求去服务端获取注册信息之前,做了一个判断, 判断registryRefreshSingleVipAddress是否为空, 这个字段
表示的意思是 “此客户端只对一个单一的VIP注册表的信息感兴趣”,默认为null , 也就是说如果客户端只对其中一个VIP 感兴趣
那么就只获取这一个, 否则全部获取
this.filterAndShuffle(apps) : 是否需要过滤客户端信息的状态,如果设置了eureka.shouldFilterOnlyUpInstances = true 这个属性的话,
客户端获取到注册信息之后,会剔除非UP状态的客户端信息。
localRegionApps.set(this.filterAndShuffle(apps)) : 将注册信息设置到本地内存里面去,使用AtomicReference类型做存储、
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
增量获取
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
// 增量获取信息
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
// 增量获取为空,则全量返回
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
//这里设置原子锁的原因是怕某次调度网络请求时间过长,导致同一时间有多线程拉取到增量信息并发修改
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 将获取到的增量信息和本地缓存信息合并。
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
// 释放锁
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// ( HashCode 不一致|| 打印增量和全量的差异 )= true 重新去全量获取
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
步骤说明:
1.发起http请求,将服务端的客户端变化的信息拉取过来,如: register, cancle, modify 有过这些操作的数据
2.上锁,防止某次调度网络请求时间过长,导致同一时间有多线程拉取到增量信息并发修改
3.将请求过来的增量数据和本地的数据做合并
4.计算hashCode
5.如果hashCode不一致,或者clientConfig.shouldLogDeltaDiff() = true 的话,则又会去服务端发起一次全量获取
合并数据
private void updateDelta(Applications delta) {
int deltaCount = 0;
// 循环拉取过来的应用列表
for (Application app : delta.getRegisteredApplications()) {
// 循环这个应用里面的实例(有多个实例代表是集群的。)
for (InstanceInfo instance : app.getInstances()) {
// 获取本地的注册应用列表
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {
remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}
++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) {// 添加事件
//根据AppName 获取本地的数据,看这个应用是否存在
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
// 不存在,则加到本地的应用里面去
applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
// 为本地这个应用添加这个实例
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改事件
//根据AppName 获取本地的数据,看这个应用是否存在
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
// 不存在,则加到本地的应用里面去
applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps ", instance.getId());
// 为本地这个应用添加这个实例
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.DELETED.equals(instance.getActionType())) { // 删除事件
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
// 移除这个实例
applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
}
}
}
logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
for (Applications applications : remoteRegionVsApps.values()) {
applications.setVersion(delta.getVersion());
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}
步骤说明:
1.从服务端获取了最近这段时间,新注册新来的客户端信息,有过修改的,被删除的, 这三大类的实例信息
然后通过覆盖本地的数据,移除数据,来达到数据合并的需求。
Eureka-Server接收请求
控制器接收请求
com.netflix.eureka.resources.ApplicationsResource , 程序入口
全量获取
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
// 获取注册列表的区域
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// 判断是否可以访问
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
// 设置API版本
CurrentRequestVersion.set(Version.toEnum(version));
// 默认key的类型为JSON
KeyType keyType = Key.KeyType.JSON;
// 默认设置返回类型为JSON
String returnMediaType = MediaType.APPLICATION_JSON;
// 如果Accept为空,或者不包含JSON字符串(表示客户端可能不接收JSON类型),则设置返回XML类型的
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
// 构建缓存KEY
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
// 获取缓存信息,返回给客户端
Response response;
// 判断请求接收类型是否是gzip ,如果是,则返回gzip的流出去
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
PS: 从Controller中可以看到,获取注册信息都是调用缓存操作类来最终获取到的,此处暂时先不讲缓存机制的实现
后面会单独开一篇文章来写Eureka的缓存机制
获取全量信息,主要就是从AbstractInstanceRegistry类getApplications中的registry中获取的,代码如下
public Applications getApplications() {
boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
if (disableTransparentFallback) {
return getApplicationsFromLocalRegionOnly();
} else {
return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.
}
}
disableTransparentFallback : 官网解释是 , 如果在远程区域本地没有实例运行,对于应用程序回退的旧行为是否被禁用, 默认为false,所以此处仅
详细讲getApplicationsFromAllRemoteRegions() ;
public Applications getApplicationsFromAllRemoteRegions() {
return getApplicationsFromMultipleRegions(allKnownRemoteRegions);
}
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
includeRemoteRegion, Arrays.toString(remoteRegions));
// 默认为false
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
Applications apps = new Applications();
apps.setVersion(1L);
// 循环该类中的CurrentHashMap, 这个MAP中,存储的是所有的客户端注册的实例信息
// KEY 为客户端的名称,value为客户端的集群机器信息。
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
//
if (entry.getValue() != null) {
// 获取Lease信息,里面有每个实例的instance信息,分装成Application实体
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
//放入 Applications里面去
apps.addApplication(app);
}
}
// 。。。。省略N多代码
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
总结: 从上面的代码上来看,全量获取的机制很简单,主要是把服务端本地的CurrentHashMap里面存储的客户端信息,封装成
Application实体,然后返回。
增量获取
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
// ..... 省略N多代码
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
// ..... 省略N多代码
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
}
获取增量信息的代码在AbstractInstanceRegistry类中,代码如下
public Applications getApplicationDeltas() {
GET_ALL_CACHE_MISS_DELTA.increment();
// 最近变化过的应用,初始化一个实体
Applications apps = new Applications();
// 增量获取的版本号
apps.setVersion(responseCache.getVersionDelta().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
// 上写锁
write.lock();
// 最近产生过变化的客户端,都在这个队列里面
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is :"
+ this.recentlyChangedQueue.size());
// 循环队列
while (iter.hasNext()) {
// 获取队列中的lease信息,这里面封装的就是客户端的实例信息
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
Object[] args = {instanceInfo.getId(),
instanceInfo.getStatus().name(),
instanceInfo.getActionType().name()};
logger.debug(
"The instance id %s is found with status %s and actiontype %s",
args);
Application app = applicationInstancesMap.get(instanceInfo
.getAppName());
if (app == null) {
// 组装成一个Application实体,同时放入Applications里面去
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(decorateInstanceInfo(lease));
}
boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
// 暂时没看明白这里的作用(苦笑。。)
if (!disableTransparentFallback) {
Applications allAppsInLocalRegion = getApplications(false);
for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
Applications applications = remoteRegistry.getApplicationDeltas();
for (Application application : applications.getRegisteredApplications()) {
Application appInLocalRegistry =
allAppsInLocalRegion.getRegisteredApplications(application.getName());
if (appInLocalRegistry == null) {
apps.addApplication(application);
}
}
}
}
// 获取全量的注册信息
Applications allApps = getApplications(!disableTransparentFallback);
// 设置HashCode
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
上面主要用到了一个租约变化的队列, 这里面在客户端发生变化时,都会在这里面加入一条信息, 如: 注册,下线,过期
等操作,租约变化队列里面的数据默认保存3分钟,会有一个定时器没30秒清理一次。
retentionTimeInMSInDeltaQueue : 客户端保持增量信息缓存的时间,从而保证不会丢失这些信息,单位为毫秒,默认为3 * 60 * 1000
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
// 最后更新时间小于当前时间-3分钟,那么就会被移除
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
获取到了这些变化的客户端信息,返回Eureka Clien 之后,通过集合合并,就可以得到最新的缓存数据了。
对于服务端来说, 接收全量获取和增量获取的请求,区别在于,构成的KEY不同, 全量获取的KEY 为ALL_APPS,
增量获取的KEY是ALL_APPS_DELTA , 然后都是通过缓存操作类去获取数据,因此最重要的是缓存类的功能实现
接下来会单独开一篇讲Eureka Server 服务端的缓存机制。