Eureka源码分析(十二) 网络通信

2018-11-12  本文已影响0人  skyguard

下面我们来说一下eureka的网络通信。eureka主要包含两个方面的网络通信:
Eureka-Client 请求 Eureka-Server 的网络通信
Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信
EurekaJerseyClient ,EurekaHttpClient 接口。主要是基于 Apache HttpClient4 实现的 Jersey Client。关于Jersey,就不在这里介绍了。
EurekaJerseyClientImpl ,EurekaHttpClient 实现类,看下具体的实现

public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout,
                              ClientConfig clientConfig) {
    try {
        jerseyClientConfig = clientConfig;
        // 创建  ApacheHttpClient
        apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig);

        // 设置 连接参数
        HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams();
        HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
        HttpConnectionParams.setSoTimeout(params, readTimeout);

        // 创建 ApacheHttpClientConnectionCleaner
        this.apacheHttpClientConnectionCleaner = new ApacheHttpClientConnectionCleaner(apacheHttpClient, connectionIdleTimeout);
    } catch (Throwable e) {
        throw new RuntimeException("Cannot create Jersey client", e);
    }
}

EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 内部类,用于创建 EurekaJerseyClientImpl 。调用 build方法,创建 EurekaJerseyClientImpl 。
EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法。
EurekaHttpResponse ,请求响应对象。
TransportClientFactory ,创建 EurekaHttpClient 的工厂接口。
AbstractJerseyEurekaHttpClient ,实现 EurekaHttpClient 的抽象类,真正实现了具体的 Eureka-Server API 调用方法。看下具体的实现

 public EurekaHttpResponse<Void> register(InstanceInfo info) {
    // 设置 请求地址
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        // 设置 请求头
        addExtraHeaders(resourceBuilder);
        // 请求 Eureka-Server
        response = resourceBuilder
                .header("Accept-Encoding", "gzip") // GZIP
                .type(MediaType.APPLICATION_JSON_TYPE) // 请求参数格式 JSON
                .accept(MediaType.APPLICATION_JSON) // 响应结果格式 JSON
                .post(ClientResponse.class, info); // 请求参数
        // 创建 EurekaHttpResponse
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

JerseyApplicationClient ,实现 Eureka-Client 请求 Eureka-Server 的网络通信。
JerseyEurekaHttpClientFactory ,创建 JerseyApplicationClient 的工厂类。
JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 内部类,用于创建 JerseyEurekaHttpClientFactory 。
JerseyReplicationClient ,Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信。
实现 AbstractJerseyEurekaHttpClient的addExtraHeaders方法,添加自定义头 x-netflix-discovery-replication=true,看下具体的实现

protected void addExtraHeaders(Builder webResource) {
    webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
}

实现HttpReplicationClient 接口,实现了 submitBatchUpdates方法

public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {
    ClientResponse response = null;
    try {
        response = jerseyApacheClient.resource(serviceUrl)
                .path(PeerEurekaNode.BATCH_URL_PATH)
                .accept(MediaType.APPLICATION_JSON_TYPE)
                .type(MediaType.APPLICATION_JSON_TYPE)
                .post(ClientResponse.class, replicationList);
        if (!isSuccess(response.getStatus())) {
            return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
        }
        ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse.class);
        return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
}

MetricsCollectingEurekaHttpClient ,监控指标收集 EurekaHttpClient ,配合 Netflix Servo实现监控信息采集。看下具体的实现

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    // 获得 请求类型 的 请求指标
    EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
    Stopwatch stopwatch = requestMetrics.latencyTimer.start();
    try {
        // 执行请求
        EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
        // 增加 请求指标
        requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
        return httpResponse;
    } catch (Exception e) {
        requestMetrics.connectionErrors.increment();
        exceptionsMetric.count(e);
        throw e;
    } finally {
        stopwatch.stop();
    }
}

RedirectingEurekaHttpClient ,寻找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。看下具体的实现

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    EurekaHttpClient currentEurekaClient = delegateRef.get();
    if (currentEurekaClient == null) { // 未找到非 302 的 Eureka-Server
        AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint));
        try {
            EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef);
            // 关闭原有的委托 EurekaHttpClient ,并设置当前成功非 302 请求的 EurekaHttpClient
            TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get()));
            return response;
        } catch (Exception e) {
            logger.error("Request execution error", e);
            TransportUtils.shutdown(currentEurekaClientRef.get());
            throw e;
        }
    } else { // 已经找到非 302 的 Eureka-Server
        try {
            return requestExecutor.execute(currentEurekaClient);
        } catch (Exception e) {
            logger.error("Request execution error", e);
            delegateRef.compareAndSet(currentEurekaClient, null);
            currentEurekaClient.shutdown();
            throw e;
        }
    }
}

RetryableEurekaHttpClient ,支持向多个 Eureka-Server 请求重试的 EurekaHttpClient 。看下具体的实现

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    List<EurekaEndpoint> candidateHosts = null;
    int endpointIdx = 0;
    for (int retry = 0; retry < numberOfRetries; retry++) {
        EurekaHttpClient currentHttpClient = delegate.get();
        EurekaEndpoint currentEndpoint = null;

        // 当前委托的 EurekaHttpClient 不存在
        if (currentHttpClient == null) {
            // 获得候选的 Eureka-Server 地址数组
            if (candidateHosts == null) {
                candidateHosts = getHostCandidates();
                if (candidateHosts.isEmpty()) {
                    throw new TransportException("There is no known eureka server; cluster server list is empty");
                }
            }

            // 超过候选的 Eureka-Server 地址数组上限
            if (endpointIdx >= candidateHosts.size()) {
                throw new TransportException("Cannot execute request on any known server");
            }

            // 创建候选的 EurekaHttpClient
            currentEndpoint = candidateHosts.get(endpointIdx++);
            currentHttpClient = clientFactory.newClient(currentEndpoint);
        }

        try {
            // 执行请求
            EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
            // 判断是否为可接受的相应,若是,返回。
            if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                delegate.set(currentHttpClient);
                if (retry > 0) {
                    logger.info("Request execution succeeded on retry #{}", retry);
                }
                return response;
            }
            logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
        } catch (Exception e) {
            logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
        }

        // 请求失败,若是 currentHttpClient ,清除 delegate
        // Connection error or 5xx from the server that must be retried on another server
        delegate.compareAndSet(currentHttpClient, null);

        // 请求失败,将 currentEndpoint 添加到隔离集合
        if (currentEndpoint != null) {
            quarantineSet.add(currentEndpoint);
        }
    }
    throw new TransportException("Retry limit reached; giving up on completing the request");
}

SessionedEurekaHttpClient ,支持会话的 EurekaHttpClient 。执行定期的重建会话,防止一个 Eureka-Client 永远只连接一个特定的 Eureka-Server 。反过来,这也保证了 Eureka-Server 集群变更时,Eureka-Client 对 Eureka-Server 连接的负载均衡。看下具体的实现

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    long now = System.currentTimeMillis();
    long delay = now - lastReconnectTimeStamp;

    // 超过 当前会话时间,关闭当前委托的 EurekaHttpClient 。
    if (delay >= currentSessionDurationMs) {
        logger.debug("Ending a session and starting anew");
        lastReconnectTimeStamp = now;
        currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
        TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
    }

    // 获得委托的 EurekaHttpClient 。若不存在,则创建新的委托的 EurekaHttpClient 。
    EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
    if (eurekaHttpClient == null) {
        eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
    }
    return requestExecutor.execute(eurekaHttpClient);
}

对于 Eureka-Server 来说,调用 JerseyReplicationClient的createReplicationClient静态方法即可创建用于 Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信客户端。
对于 Eureka-Client 来说,分成用于注册应用实例( registrationClient )和查询注册信息( newQueryClient )的两个不同网络通信客户端。在 DiscoveryClient 初始化时进行创建。看下具体的实现

 private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                        AbstractDiscoveryClientOptionalArgs args) {

        
    Collection<?> additionalFilters = args == null
            ? Collections.emptyList()
            : args.additionalFilters;

    EurekaJerseyClient providedJerseyClient = args == null
            ? null
            : args.eurekaJerseyClient;
    
    TransportClientFactories argsTransportClientFactories = null;
    if (args != null && args.getTransportClientFactories() != null) {
        argsTransportClientFactories = args.getTransportClientFactories();
    }
    
    // Ignore the raw types warnings since the client filter interface changed between jersey 1/2
    @SuppressWarnings("rawtypes")
    TransportClientFactories transportClientFactories = argsTransportClientFactories == null
            ? new Jersey1TransportClientFactories()
            : argsTransportClientFactories;

    // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
    // noinspection unchecked
    eurekaTransport.transportClientFactory = providedJerseyClient == null
            ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
            : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);

    // 初始化 应用解析器的应用实例数据源 TODO[0028]写入集群和读取集群
    ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
        @Override
        public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
            long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
            long delay = getLastSuccessfulRegistryFetchTimePeriod();
            if (delay > thresholdInMs) {
                logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
                        thresholdInMs, delay);
                return null;
            } else {
                return localRegionApps.get();
            }
        }
    };

    // 创建 EndPoint 解析器
    eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
            clientConfig,
            transportConfig,
            eurekaTransport.transportClientFactory,
            applicationInfoManager.getInfo(),
            applicationsSource
    );

    if (clientConfig.shouldRegisterWithEureka()) {
        EurekaHttpClientFactory newRegistrationClientFactory = null;
        EurekaHttpClient newRegistrationClient = null;
        try {
            newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    transportConfig
            );
            newRegistrationClient = newRegistrationClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
        eurekaTransport.registrationClient = newRegistrationClient;
    }

    // new method (resolve from primary servers for read)
    // Configure new transport layer (candidate for injecting in the future)
    if (clientConfig.shouldFetchRegistry()) {
        EurekaHttpClientFactory newQueryClientFactory = null;
        EurekaHttpClient newQueryClient = null;
        try {
            newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    clientConfig,
                    transportConfig,
                    applicationInfoManager.getInfo(),
                    applicationsSource
            );
            newQueryClient = newQueryClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.queryClientFactory = newQueryClientFactory;
        eurekaTransport.queryClient = newQueryClient;
    }
}

eureka的网络通信就介绍到这里了。

上一篇下一篇

猜你喜欢

热点阅读