eureka 源码跟踪
首先看@EnableDiscoveryClient
源码:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
// 往容器中注册 EnableDiscoveryClientImportSelector
@Import({EnableDiscoveryClientImportSelector.class})
public @interface EnableDiscoveryClient {
boolean autoRegister() default true;
}
先看EnableDiscoveryClientImportSelector
继承关系:
从上图可以发现EnableDiscoveryClientImportSelector
实现了ImportSelector
接口。
接着看EnableDiscoveryClientImportSelector
的代码:
public String[] selectImports(AnnotationMetadata metadata) {
String[] imports = super.selectImports(metadata);
AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(this.getAnnotationClass().getName(), true));
boolean autoRegister = attributes.getBoolean("autoRegister");
// 如果是自动注册添加 org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration
// AutoServiceRegistrationConfiguration 这个貌似就是一个空实现类。
if (autoRegister) {
List<String> importsList = new ArrayList(Arrays.asList(imports));
importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
imports = (String[])importsList.toArray(new String[0]);
} else {
Environment env = this.getEnvironment();
if (ConfigurableEnvironment.class.isInstance(env)) {
ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env;
LinkedHashMap<String, Object> map = new LinkedHashMap();
map.put("spring.cloud.service-registry.auto-registration.enabled", false);
MapPropertySource propertySource = new MapPropertySource("springCloudDiscoveryClient", map);
configEnv.getPropertySources().addLast(propertySource);
}
}
return imports;
}
从spring-cloud-commons
包下 /META-INF/spring.factories
可以发现, 自动注入的类:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.CommonsClientAutoConfiguration,\
org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClientAutoConfiguration,\
// NoopDiscoveryClientAutoConfiguration 已经过时
org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\
org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration,\
插一段: 注册EurekaDiscoveryClient
类
查看spring-cloud-netflix-eureka-client
下的 /META-INF/spring.factories
内容
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
看上面spring.factories
标红的配置类:
EurekaClientAutoConfiguration
:
public class EurekaClientAutoConfiguration {
// 往容器中注入EurekaDiscoveryClient 类。
@Bean
public DiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
return new EurekaDiscoveryClient(client, clientConfig);
}
}
首先看看SimpleDiscoveryClientAutoConfiguration
配置类代码:
@Configuration
@AutoConfigureBefore({NoopDiscoveryClientAutoConfiguration.class})
public class SimpleDiscoveryClientAutoConfiguration {
@Autowired(
required = false
)
private ServerProperties server;
@Autowired
private ApplicationContext context;
// 配置文件中设置的 spring.application.name
@Value("${spring.application.name:application}")
private String serviceId;
@Autowired
private InetUtils inet;
public SimpleDiscoveryClientAutoConfiguration() {
}
// 将配置好的服务信息, serviceId, hostname, port 等信息包装在SimpleDiscoveryProperties 中返回
@Bean
public SimpleDiscoveryProperties simpleDiscoveryProperties() {
SimpleDiscoveryProperties simple = new SimpleDiscoveryProperties();
simple.getLocal().setServiceId(this.serviceId);
simple.getLocal().setUri(URI.create("http://" + this.inet.findFirstNonLoopbackHostInfo().getHostname() + ":" + this.findPort()));
return simple;
}
// 根据simpleDiscoveryProperties() 方法新增DiscoveryClient 的实现SimpleDiscoveryClient 到容器中
@Bean
@Order(2147483647)
public DiscoveryClient simpleDiscoveryClient() {
return new SimpleDiscoveryClient(this.simpleDiscoveryProperties());
}
// ....
}
再看看CompositeDiscoveryClientAutoConfiguration
配置类代码:
@Configuration
// 加载SimpleDiscoveryClientAutoConfiguration 前加载该类
@AutoConfigureBefore({SimpleDiscoveryClientAutoConfiguration.class})
public class CompositeDiscoveryClientAutoConfiguration {
public CompositeDiscoveryClientAutoConfiguration() {
}
// 新增DiscoveryClient 实现类CompositeDiscoveryClient. 该类中包含容器中现有的discoveryClients 集合
// 在项目中注入 DiscoveryClient 默认注入的就是该实现类[CompositeDiscoveryClient]
// 自动注入discoveryClients 会发现该集合中包含 EurekaDiscoveryClient和 SimpleDiscoveryClient 两个
@Bean
@Primary
public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
return new CompositeDiscoveryClient(discoveryClients);
}
}
我们重点看下 EurekaDiscoveryClient
中的代码实现:
EurekaDiscoveryClient
中的代码:
public class EurekaDiscoveryClient implements DiscoveryClient {
public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
private final EurekaClient eurekaClient;
private final EurekaClientConfig clientConfig;
/** @deprecated */
@Deprecated
public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
this(eurekaClient, eurekaClient.getEurekaClientConfig());
}
public EurekaDiscoveryClient(EurekaClient eurekaClient, EurekaClientConfig clientConfig) {
this.clientConfig = clientConfig;
this.eurekaClient = eurekaClient;
}
public String description() {
return "Spring Cloud Eureka Discovery Client";
}
// 根据serviceId 获取ServiceInstance 的集合
public List<ServiceInstance> getInstances(String serviceId) {
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false);
List<ServiceInstance> instances = new ArrayList();
Iterator var4 = infos.iterator();
while(var4.hasNext()) {
InstanceInfo info = (InstanceInfo)var4.next();
instances.add(new EurekaDiscoveryClient.EurekaServiceInstance(info));
}
return instances;
}
// 获取服务名集合
public List<String> getServices() {
Applications applications = this.eurekaClient.getApplications();
if (applications == null) {
return Collections.emptyList();
} else {
List<Application> registered = applications.getRegisteredApplications();
List<String> names = new ArrayList();
Iterator var4 = registered.iterator();
while(var4.hasNext()) {
Application app = (Application)var4.next();
if (!app.getInstances().isEmpty()) {
names.add(app.getName().toLowerCase());
}
}
return names;
}
}
}
我们从上面代码可以发现, 获取服务的方法最后的实现都丢给了EurekaClient
。
EurekaClient
的实现:
-DiscoveryClient
[重点debug实现]
-CloudEurekaClient
DiscoveryClient
的代码查看:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
this.clientConfig = config;
staticClientConfig = this.clientConfig;
this.transportConfig = config.getTransportConfig();
this.instanceInfo = myInfo;
if (myInfo != null) {
this.appPathIdentifier = this.instanceInfo.getAppName() + "/" + this.instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.endpointRandomizer = endpointRandomizer;
this.urlRandomizer = new InstanceInfoBasedUrlRandomizer(this.instanceInfo);
this.localRegionApps.set(new Applications());
this.fetchRegistryGeneration = new AtomicLong(0L);
this.remoteRegionsToFetch = new AtomicReference(this.clientConfig.fetchRegistryForRemoteRegions());
this.remoteRegionsRef = new AtomicReference(this.remoteRegionsToFetch.get() == null ? null : ((String)this.remoteRegionsToFetch.get()).split(","));
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, "eurekaClient.registry.lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, "eurekaClient.registration.lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", this.clientConfig.getRegion());
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
this.scheduler = null;
this.heartbeatExecutor = null;
this.cacheRefreshExecutor = null;
this.eurekaTransport = null;
this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion());
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
this.initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());
} else {
try {
this.scheduler = Executors.newScheduledThreadPool(2,
(new ThreadFactoryBuilder())
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
this.heartbeatExecutor = new ThreadPoolExecutor(1,
this.clientConfig.getHeartbeatExecutorThreadPoolSize(),
0L,
TimeUnit.SECONDS,
new SynchronousQueue(),
(new ThreadFactoryBuilder())
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build());
this.cacheRefreshExecutor = new ThreadPoolExecutor(1,
this.clientConfig.getCacheRefreshExecutorThreadPoolSize(),
0L,
TimeUnit.SECONDS,
new SynchronousQueue(),
(new ThreadFactoryBuilder())
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build());
this.eurekaTransport = new DiscoveryClient.EurekaTransport();
this.scheduleServerEndpointTask(this.eurekaTransport, args);
this.instanceRegionChecker = new InstanceRegionChecker((AzToRegionMapper)azToRegionMapper, this.clientConfig.getRegion());
} catch (Throwable var10) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", var10);
}
// ...
this.initScheduledTasks();
// ...
}
}
上面新建了一个心跳线程池[heartbeatExecutor
]和一个注册信息列表刷新缓存池[cacheRefreshExecutor
]
initScheduledTasks()
方法:
private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
if (this.clientConfig.shouldFetchRegistry()) {
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh",
this.scheduler,
this.cacheRefreshExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
// 刷新服务列表缓存服务
new DiscoveryClient.CacheRefreshThread()),
(long)renewalIntervalInSecs,
TimeUnit.SECONDS);
}
if (this.clientConfig.shouldRegisterWithEureka()) {
renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);
this.scheduler.schedule(new TimedSupervisorTask("heartbeat",
this.scheduler,
this.heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
// 续租服务
new DiscoveryClient.HeartbeatThread()),
(long)renewalIntervalInSecs,
TimeUnit.SECONDS);
this.instanceInfoReplicator = new InstanceInfoReplicator(this,
this.instanceInfo,
this.clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2);
// ....
this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
接着进入instanceInfoReplicator
[实现了Runnable
接口] 的run()方法:
public void run() {
boolean var6 = false;
ScheduledFuture next;
label53: {
try {
var6 = true;
this.discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
this.discoveryClient.register();
this.instanceInfo.unsetIsDirty(dirtyTimestamp);
var6 = false;
} else {
var6 = false;
}
break label53;
} catch (Throwable var7) {
logger.warn("There was a problem with the instance info replicator", var7);
var6 = false;
} finally {
if (var6) {
ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
this.scheduledPeriodicRef.set(next);
}
}
next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
this.scheduledPeriodicRef.set(next);
return;
}
next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
this.scheduledPeriodicRef.set(next);
}
服务注册方法最终回调用到DiscoveryClient
的register()
方法:
boolean register() throws Throwable {
logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
EurekaHttpResponse httpResponse;
try {
httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
} catch (Exception var3) {
logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
throw var3;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
[eureka-client
向eureka-server
注册的最终提交代码]
[服务注册]
该register()
继续调用 AbstractJerseyEurekaHttpClient.register()
方法:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
EurekaHttpResponse var5;
try {
Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
// resourceBuilder[WebResource实例] 中的url: http://localhost:8761/eureka/apps/EUREKA-CLIENT
// resourceBuilder 中CopyOnWriteHashMap<String, Object> properties 属性:
// "http.protocol.handle-redirects" -> "false"
// "com.sun.jersey.client.property.followRedirects" -> "false"
// "com.sun.jersey.impl.client.httpclient.connectionManager" ->
// "http.useragent" -> "Java-EurekaClient/v1.9.12"
this.addExtraHeaders(resourceBuilder);
// 向eureka-server 执行注册请求[并将自身信息 info提交过去]
response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip"))
.type(MediaType.APPLICATION_JSON_TYPE))
.accept(new String[]{"application/json"}))
.post(ClientResponse.class, info);
var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (response != null) {
response.close();
}
}
return var5;
}
[续订服务续约]
private class HeartbeatThread implements Runnable {
private HeartbeatThread() {
}
public void run() {
if (DiscoveryClient.this.renew()) {
DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
DiscoveryClient.this.renew()
代码:
boolean renew() {
try {
EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient
.sendHeartBeat(this.instanceInfo.getAppName(),
this.instanceInfo.getId(),
this.instanceInfo,
(InstanceStatus)null);
logger.debug("DiscoveryClient_{} - Heartbeat status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
this.REREGISTER_COUNTER.increment();
logger.info("DiscoveryClient_{} - Re-registering apps/{}", this.appPathIdentifier, this.instanceInfo.getAppName());
long timestamp = this.instanceInfo.setIsDirtyWithTime();
boolean success = this.register();
if (success) {
this.instanceInfo.unsetIsDirty(timestamp);
}
return success;
} else {
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
}
} catch (Throwable var5) {
logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, var5);
return false;
}
}
最终调用com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#sendHeartBeat
方法:
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
EurekaHttpResponse var10;
try {
// 最后请求的链接:
// http://localhost:8761/eureka/apps/EUREKA-CLIENT/localhost:eureka-client:8762?status=UP&lastDirtyTimestamp=1583157097771
WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
this.addExtraHeaders(requestBuilder);
// 执行请求操作
// 如果请求返回404, 说明不存在, 需要重新走注册流程
response = (ClientResponse)requestBuilder.put(ClientResponse.class);
EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
if (response.hasEntity()) {
eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
}
var10 = eurekaResponseBuilder.build();
} finally {
if (response != null) {
response.close();
}
}
return var10;
}
[服务下线取消]:
@PreDestroy
public synchronized void shutdown() {
if (this.isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (this.statusChangeListener != null && this.applicationInfoManager != null) {
this.applicationInfoManager.unregisterStatusChangeListener(this.statusChangeListener.getId());
}
this.cancelScheduledTasks();
if (this.applicationInfoManager != null && this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldUnregisterOnShutdown()) {
this.applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
// 服务下线操作
this.unregister();
}
if (this.eurekaTransport != null) {
this.eurekaTransport.shutdown();
}
this.heartbeatStalenessMonitor.shutdown();
this.registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
进入同类方法unregister()
方法中:
void unregister() {
if (this.eurekaTransport != null && this.eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = this.eurekaTransport.registrationClient.cancel(this.instanceInfo.getAppName(), this.instanceInfo.getId());
logger.info("DiscoveryClient_{} - deregister status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception var2) {
logger.error("DiscoveryClient_{} - de-registration failed{}", new Object[]{this.appPathIdentifier, var2.getMessage(), var2});
}
}
}
cancel()
最终执行com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#cancel
:
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
EurekaHttpResponse var6;
try {
Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
this.addExtraHeaders(resourceBuilder);
response = (ClientResponse)resourceBuilder.delete(ClientResponse.class);
var6 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (response != null) {
response.close();
}
}
return var6;
}