eureka 源码跟踪

2020-04-09  本文已影响0人  NirvanalI

首先看@EnableDiscoveryClient 源码:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
// 往容器中注册 EnableDiscoveryClientImportSelector
@Import({EnableDiscoveryClientImportSelector.class})
public @interface EnableDiscoveryClient {
    boolean autoRegister() default true;
}

先看EnableDiscoveryClientImportSelector 继承关系:

image.png

从上图可以发现EnableDiscoveryClientImportSelector 实现了ImportSelector 接口。
接着看EnableDiscoveryClientImportSelector 的代码:

public String[] selectImports(AnnotationMetadata metadata) {
    String[] imports = super.selectImports(metadata);
    AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(this.getAnnotationClass().getName(), true));
    boolean autoRegister = attributes.getBoolean("autoRegister");
    // 如果是自动注册添加 org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration
    // AutoServiceRegistrationConfiguration 这个貌似就是一个空实现类。
    if (autoRegister) {
        List<String> importsList = new ArrayList(Arrays.asList(imports));
        importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
        imports = (String[])importsList.toArray(new String[0]);
    } else {
        Environment env = this.getEnvironment();
        if (ConfigurableEnvironment.class.isInstance(env)) {
            ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env;
            LinkedHashMap<String, Object> map = new LinkedHashMap();
            map.put("spring.cloud.service-registry.auto-registration.enabled", false);
            MapPropertySource propertySource = new MapPropertySource("springCloudDiscoveryClient", map);
            configEnv.getPropertySources().addLast(propertySource);
        }
    }
    return imports;
}

spring-cloud-commons包下 /META-INF/spring.factories 可以发现, 自动注入的类:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.CommonsClientAutoConfiguration,\
org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClientAutoConfiguration,\
// NoopDiscoveryClientAutoConfiguration 已经过时
org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\
org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration,\

插一段: 注册EurekaDiscoveryClient
查看spring-cloud-netflix-eureka-client 下的 /META-INF/spring.factories 内容

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

看上面spring.factories 标红的配置类:
EurekaClientAutoConfiguration:

public class EurekaClientAutoConfiguration {
    
    // 往容器中注入EurekaDiscoveryClient 类。
    @Bean
    public DiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
        return new EurekaDiscoveryClient(client, clientConfig);
    }
}

首先看看SimpleDiscoveryClientAutoConfiguration 配置类代码:

@Configuration
@AutoConfigureBefore({NoopDiscoveryClientAutoConfiguration.class})
public class SimpleDiscoveryClientAutoConfiguration {
    @Autowired(
        required = false
    )
    private ServerProperties server;
    @Autowired
    private ApplicationContext context;
    // 配置文件中设置的 spring.application.name
    @Value("${spring.application.name:application}")
    private String serviceId;
    @Autowired
    private InetUtils inet;
    
    public SimpleDiscoveryClientAutoConfiguration() {
    }
    
    // 将配置好的服务信息, serviceId, hostname, port 等信息包装在SimpleDiscoveryProperties 中返回
    @Bean
    public SimpleDiscoveryProperties simpleDiscoveryProperties() {
        SimpleDiscoveryProperties simple = new SimpleDiscoveryProperties();
        simple.getLocal().setServiceId(this.serviceId);
        simple.getLocal().setUri(URI.create("http://" + this.inet.findFirstNonLoopbackHostInfo().getHostname() + ":" + this.findPort()));
        return simple;
    }
    
    // 根据simpleDiscoveryProperties() 方法新增DiscoveryClient 的实现SimpleDiscoveryClient 到容器中
    @Bean
    @Order(2147483647)
    public DiscoveryClient simpleDiscoveryClient() {
        return new SimpleDiscoveryClient(this.simpleDiscoveryProperties());
    }
    
    // ....
}

再看看CompositeDiscoveryClientAutoConfiguration 配置类代码:

@Configuration
// 加载SimpleDiscoveryClientAutoConfiguration 前加载该类
@AutoConfigureBefore({SimpleDiscoveryClientAutoConfiguration.class})
public class CompositeDiscoveryClientAutoConfiguration {
    public CompositeDiscoveryClientAutoConfiguration() {
        
    }
    
    // 新增DiscoveryClient 实现类CompositeDiscoveryClient. 该类中包含容器中现有的discoveryClients 集合
    // 在项目中注入 DiscoveryClient  默认注入的就是该实现类[CompositeDiscoveryClient]
    // 自动注入discoveryClients 会发现该集合中包含 EurekaDiscoveryClient和 SimpleDiscoveryClient 两个
    @Bean
    @Primary
    public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
        return new CompositeDiscoveryClient(discoveryClients);
    }
}

我们重点看下 EurekaDiscoveryClient 中的代码实现:
EurekaDiscoveryClient 中的代码:

public class EurekaDiscoveryClient implements DiscoveryClient {
    public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
    private final EurekaClient eurekaClient;
    private final EurekaClientConfig clientConfig;
    
    /** @deprecated */
    @Deprecated
    public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
        this(eurekaClient, eurekaClient.getEurekaClientConfig());
    }
    
    public EurekaDiscoveryClient(EurekaClient eurekaClient, EurekaClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        this.eurekaClient = eurekaClient;
    }
    
    public String description() {
        return "Spring Cloud Eureka Discovery Client";
    }
    // 根据serviceId 获取ServiceInstance 的集合
    public List<ServiceInstance> getInstances(String serviceId) {
        List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false);
        List<ServiceInstance> instances = new ArrayList();
        Iterator var4 = infos.iterator();
        while(var4.hasNext()) {
            InstanceInfo info = (InstanceInfo)var4.next();
            instances.add(new EurekaDiscoveryClient.EurekaServiceInstance(info));
        }
        
        return instances;
    }
    // 获取服务名集合
    public List<String> getServices() {
        Applications applications = this.eurekaClient.getApplications();
        if (applications == null) {
            return Collections.emptyList();
        } else {
            List<Application> registered = applications.getRegisteredApplications();
            List<String> names = new ArrayList();
            Iterator var4 = registered.iterator();
            
            while(var4.hasNext()) {
                Application app = (Application)var4.next();
                if (!app.getInstances().isEmpty()) {
                    names.add(app.getName().toLowerCase());
                }
            }
            
            return names;
        }
    }
}

我们从上面代码可以发现, 获取服务的方法最后的实现都丢给了EurekaClient
EurekaClient的实现:
-DiscoveryClient[重点debug实现]
-CloudEurekaClient

DiscoveryClient的代码查看:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    this.clientConfig = config;
    staticClientConfig = this.clientConfig;
    this.transportConfig = config.getTransportConfig();
    this.instanceInfo = myInfo;
    if (myInfo != null) {
        this.appPathIdentifier = this.instanceInfo.getAppName() + "/" + this.instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }


    this.backupRegistryProvider = backupRegistryProvider;
    this.endpointRandomizer = endpointRandomizer;
    this.urlRandomizer = new InstanceInfoBasedUrlRandomizer(this.instanceInfo);
    this.localRegionApps.set(new Applications());
    this.fetchRegistryGeneration = new AtomicLong(0L);
    this.remoteRegionsToFetch = new AtomicReference(this.clientConfig.fetchRegistryForRemoteRegions());
    this.remoteRegionsRef = new AtomicReference(this.remoteRegionsToFetch.get() == null ? null : ((String)this.remoteRegionsToFetch.get()).split(","));
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, "eurekaClient.registry.lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }


    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, "eurekaClient.registration.lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }


    logger.info("Initializing Eureka in region {}", this.clientConfig.getRegion());
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        this.scheduler = null;
        this.heartbeatExecutor = null;
        this.cacheRefreshExecutor = null;
        this.eurekaTransport = null;
        this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion());
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);
        this.initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());
    } else {
        try {
            this.scheduler = Executors.newScheduledThreadPool(2, 
                    (new ThreadFactoryBuilder())
                    .setNameFormat("DiscoveryClient-%d")
                    .setDaemon(true)
                    .build());
            this.heartbeatExecutor = new ThreadPoolExecutor(1, 
                    this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 
                    0L, 
                    TimeUnit.SECONDS, 
                    new SynchronousQueue(), 
                    (new ThreadFactoryBuilder())
                    .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                    .setDaemon(true)
                    .build());
            this.cacheRefreshExecutor = new ThreadPoolExecutor(1, 
                    this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 
                    0L, 
                    TimeUnit.SECONDS, 
                    new SynchronousQueue(), 
                    (new ThreadFactoryBuilder())
                    .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                    .setDaemon(true)
                    .build());
            this.eurekaTransport = new DiscoveryClient.EurekaTransport();
            this.scheduleServerEndpointTask(this.eurekaTransport, args);
            
            this.instanceRegionChecker = new InstanceRegionChecker((AzToRegionMapper)azToRegionMapper, this.clientConfig.getRegion());
        } catch (Throwable var10) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", var10);
        }
        // ...
        this.initScheduledTasks();
        // ...
    }
}

上面新建了一个心跳线程池[heartbeatExecutor]和一个注册信息列表刷新缓存池[cacheRefreshExecutor]

initScheduledTasks() 方法:

private void initScheduledTasks() {
    int renewalIntervalInSecs;
    int expBackOffBound;
    if (this.clientConfig.shouldFetchRegistry()) {
        renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
        expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", 
            this.scheduler, 
            this.cacheRefreshExecutor, 
            renewalIntervalInSecs, 
            TimeUnit.SECONDS, 
            expBackOffBound, 
            // 刷新服务列表缓存服务
            new DiscoveryClient.CacheRefreshThread()), 
            (long)renewalIntervalInSecs, 
            TimeUnit.SECONDS);
    }
    
    if (this.clientConfig.shouldRegisterWithEureka()) {
        renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);
        this.scheduler.schedule(new TimedSupervisorTask("heartbeat", 
            this.scheduler, 
            this.heartbeatExecutor, 
            renewalIntervalInSecs, 
            TimeUnit.SECONDS, 
            expBackOffBound, 
            // 续租服务
            new DiscoveryClient.HeartbeatThread()), 
            (long)renewalIntervalInSecs, 
            TimeUnit.SECONDS);
        this.instanceInfoReplicator = new InstanceInfoReplicator(this, 
            this.instanceInfo, 
            this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 
            2);
        // ....
        this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

接着进入instanceInfoReplicator[实现了Runnable接口] 的run()方法:

public void run() {
    boolean var6 = false;
    ScheduledFuture next;
    label53: {
        try {
            var6 = true;
            this.discoveryClient.refreshInstanceInfo();
            Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                this.discoveryClient.register();
                this.instanceInfo.unsetIsDirty(dirtyTimestamp);
                var6 = false;
            } else {
                var6 = false;
            }
            break label53;
        } catch (Throwable var7) {
            logger.warn("There was a problem with the instance info replicator", var7);
            var6 = false;
        } finally {
            if (var6) {
                ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
                this.scheduledPeriodicRef.set(next);
            }
        }
        next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
        this.scheduledPeriodicRef.set(next);
        return;
    }
    next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
    this.scheduledPeriodicRef.set(next);
}

服务注册方法最终回调用到DiscoveryClientregister() 方法:

boolean register() throws Throwable {
    logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
    EurekaHttpResponse httpResponse;
    try {
        httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
    } catch (Exception var3) {
        logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
        throw var3;
    }
    
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

[eureka-clienteureka-server 注册的最终提交代码]
[服务注册]
register() 继续调用 AbstractJerseyEurekaHttpClient.register() 方法:

public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    
    EurekaHttpResponse var5;
    try {
        Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
        // resourceBuilder[WebResource实例] 中的url: http://localhost:8761/eureka/apps/EUREKA-CLIENT
        // resourceBuilder 中CopyOnWriteHashMap<String, Object> properties 属性: 
        // "http.protocol.handle-redirects" -> "false"
        // "com.sun.jersey.client.property.followRedirects" -> "false"
        // "com.sun.jersey.impl.client.httpclient.connectionManager" ->
        // "http.useragent" -> "Java-EurekaClient/v1.9.12"
        this.addExtraHeaders(resourceBuilder);
        // 向eureka-server 执行注册请求[并将自身信息 info提交过去]
        response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip"))
            .type(MediaType.APPLICATION_JSON_TYPE))
            .accept(new String[]{"application/json"}))
            .post(ClientResponse.class, info);
        var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
    return var5;
}

[续订服务续约]

private class HeartbeatThread implements Runnable {
    private HeartbeatThread() {
    }
    public void run() {
        if (DiscoveryClient.this.renew()) {
            DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

DiscoveryClient.this.renew()代码:

boolean renew() {
    try {
        EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient
            .sendHeartBeat(this.instanceInfo.getAppName(), 
            this.instanceInfo.getId(), 
            this.instanceInfo, 
            (InstanceStatus)null);
        logger.debug("DiscoveryClient_{} - Heartbeat status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            this.REREGISTER_COUNTER.increment();
            logger.info("DiscoveryClient_{} - Re-registering apps/{}", this.appPathIdentifier, this.instanceInfo.getAppName());
            long timestamp = this.instanceInfo.setIsDirtyWithTime();
            boolean success = this.register();
            if (success) {
                this.instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        } else {
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        }
    } catch (Throwable var5) {
        logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, var5);
        return false;
    }
}

最终调用com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#sendHeartBeat方法:

public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    EurekaHttpResponse var10;
    try {
        // 最后请求的链接: 
        // http://localhost:8761/eureka/apps/EUREKA-CLIENT/localhost:eureka-client:8762?status=UP&lastDirtyTimestamp=1583157097771
        WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        this.addExtraHeaders(requestBuilder);
        // 执行请求操作
        // 如果请求返回404, 说明不存在, 需要重新走注册流程
        response = (ClientResponse)requestBuilder.put(ClientResponse.class);
        EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
        if (response.hasEntity()) {
            eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
        }
        var10 = eurekaResponseBuilder.build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
    return var10;
}

[服务下线取消]:

@PreDestroy
public synchronized void shutdown() {
    if (this.isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");
        if (this.statusChangeListener != null && this.applicationInfoManager != null) {
            this.applicationInfoManager.unregisterStatusChangeListener(this.statusChangeListener.getId());
        }
        this.cancelScheduledTasks();
        if (this.applicationInfoManager != null && this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldUnregisterOnShutdown()) {
            this.applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            // 服务下线操作
            this.unregister();
        }
        if (this.eurekaTransport != null) {
            this.eurekaTransport.shutdown();
        }
        this.heartbeatStalenessMonitor.shutdown();
        this.registryStalenessMonitor.shutdown();
        logger.info("Completed shut down of DiscoveryClient");
    }
}

进入同类方法unregister() 方法中:

void unregister() {
    if (this.eurekaTransport != null && this.eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = this.eurekaTransport.registrationClient.cancel(this.instanceInfo.getAppName(), this.instanceInfo.getId());
            logger.info("DiscoveryClient_{} - deregister  status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception var2) {
            logger.error("DiscoveryClient_{} - de-registration failed{}", new Object[]{this.appPathIdentifier, var2.getMessage(), var2});
        }
    }
}

cancel() 最终执行com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#cancel:

public EurekaHttpResponse<Void> cancel(String appName, String id) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    EurekaHttpResponse var6;
    try {
        Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
        this.addExtraHeaders(resourceBuilder);
        response = (ClientResponse)resourceBuilder.delete(ClientResponse.class);
        var6 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
    return var6;
}
上一篇下一篇

猜你喜欢

热点阅读