Eureka源码分析(十二) 网络通信
下面我们来说一下eureka的网络通信。eureka主要包含两个方面的网络通信:
Eureka-Client 请求 Eureka-Server 的网络通信
Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信
EurekaJerseyClient ,EurekaHttpClient 接口。主要是基于 Apache HttpClient4 实现的 Jersey Client。关于Jersey,就不在这里介绍了。
EurekaJerseyClientImpl ,EurekaHttpClient 实现类,看下具体的实现
public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout,
ClientConfig clientConfig) {
try {
jerseyClientConfig = clientConfig;
// 创建 ApacheHttpClient
apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig);
// 设置 连接参数
HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams();
HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
HttpConnectionParams.setSoTimeout(params, readTimeout);
// 创建 ApacheHttpClientConnectionCleaner
this.apacheHttpClientConnectionCleaner = new ApacheHttpClientConnectionCleaner(apacheHttpClient, connectionIdleTimeout);
} catch (Throwable e) {
throw new RuntimeException("Cannot create Jersey client", e);
}
}
EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 内部类,用于创建 EurekaJerseyClientImpl 。调用 build方法,创建 EurekaJerseyClientImpl 。
EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法。
EurekaHttpResponse ,请求响应对象。
TransportClientFactory ,创建 EurekaHttpClient 的工厂接口。
AbstractJerseyEurekaHttpClient ,实现 EurekaHttpClient 的抽象类,真正实现了具体的 Eureka-Server API 调用方法。看下具体的实现
public EurekaHttpResponse<Void> register(InstanceInfo info) {
// 设置 请求地址
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
// 设置 请求头
addExtraHeaders(resourceBuilder);
// 请求 Eureka-Server
response = resourceBuilder
.header("Accept-Encoding", "gzip") // GZIP
.type(MediaType.APPLICATION_JSON_TYPE) // 请求参数格式 JSON
.accept(MediaType.APPLICATION_JSON) // 响应结果格式 JSON
.post(ClientResponse.class, info); // 请求参数
// 创建 EurekaHttpResponse
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
JerseyApplicationClient ,实现 Eureka-Client 请求 Eureka-Server 的网络通信。
JerseyEurekaHttpClientFactory ,创建 JerseyApplicationClient 的工厂类。
JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 内部类,用于创建 JerseyEurekaHttpClientFactory 。
JerseyReplicationClient ,Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信。
实现 AbstractJerseyEurekaHttpClient的addExtraHeaders方法,添加自定义头 x-netflix-discovery-replication=true,看下具体的实现
protected void addExtraHeaders(Builder webResource) {
webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
}
实现HttpReplicationClient 接口,实现了 submitBatchUpdates方法
public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {
ClientResponse response = null;
try {
response = jerseyApacheClient.resource(serviceUrl)
.path(PeerEurekaNode.BATCH_URL_PATH)
.accept(MediaType.APPLICATION_JSON_TYPE)
.type(MediaType.APPLICATION_JSON_TYPE)
.post(ClientResponse.class, replicationList);
if (!isSuccess(response.getStatus())) {
return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
}
ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse.class);
return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
} finally {
if (response != null) {
response.close();
}
}
}
MetricsCollectingEurekaHttpClient ,监控指标收集 EurekaHttpClient ,配合 Netflix Servo实现监控信息采集。看下具体的实现
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
// 获得 请求类型 的 请求指标
EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
Stopwatch stopwatch = requestMetrics.latencyTimer.start();
try {
// 执行请求
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
// 增加 请求指标
requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
return httpResponse;
} catch (Exception e) {
requestMetrics.connectionErrors.increment();
exceptionsMetric.count(e);
throw e;
} finally {
stopwatch.stop();
}
}
RedirectingEurekaHttpClient ,寻找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。看下具体的实现
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClient currentEurekaClient = delegateRef.get();
if (currentEurekaClient == null) { // 未找到非 302 的 Eureka-Server
AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint));
try {
EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef);
// 关闭原有的委托 EurekaHttpClient ,并设置当前成功非 302 请求的 EurekaHttpClient
TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get()));
return response;
} catch (Exception e) {
logger.error("Request execution error", e);
TransportUtils.shutdown(currentEurekaClientRef.get());
throw e;
}
} else { // 已经找到非 302 的 Eureka-Server
try {
return requestExecutor.execute(currentEurekaClient);
} catch (Exception e) {
logger.error("Request execution error", e);
delegateRef.compareAndSet(currentEurekaClient, null);
currentEurekaClient.shutdown();
throw e;
}
}
}
RetryableEurekaHttpClient ,支持向多个 Eureka-Server 请求重试的 EurekaHttpClient 。看下具体的实现
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
// 当前委托的 EurekaHttpClient 不存在
if (currentHttpClient == null) {
// 获得候选的 Eureka-Server 地址数组
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
// 超过候选的 Eureka-Server 地址数组上限
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
// 创建候选的 EurekaHttpClient
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
// 执行请求
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
// 判断是否为可接受的相应,若是,返回。
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// 请求失败,若是 currentHttpClient ,清除 delegate
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
// 请求失败,将 currentEndpoint 添加到隔离集合
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
SessionedEurekaHttpClient ,支持会话的 EurekaHttpClient 。执行定期的重建会话,防止一个 Eureka-Client 永远只连接一个特定的 Eureka-Server 。反过来,这也保证了 Eureka-Server 集群变更时,Eureka-Client 对 Eureka-Server 连接的负载均衡。看下具体的实现
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
long now = System.currentTimeMillis();
long delay = now - lastReconnectTimeStamp;
// 超过 当前会话时间,关闭当前委托的 EurekaHttpClient 。
if (delay >= currentSessionDurationMs) {
logger.debug("Ending a session and starting anew");
lastReconnectTimeStamp = now;
currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
}
// 获得委托的 EurekaHttpClient 。若不存在,则创建新的委托的 EurekaHttpClient 。
EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
if (eurekaHttpClient == null) {
eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
}
return requestExecutor.execute(eurekaHttpClient);
}
对于 Eureka-Server 来说,调用 JerseyReplicationClient的createReplicationClient静态方法即可创建用于 Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信客户端。
对于 Eureka-Client 来说,分成用于注册应用实例( registrationClient )和查询注册信息( newQueryClient )的两个不同网络通信客户端。在 DiscoveryClient 初始化时进行创建。看下具体的实现
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
AbstractDiscoveryClientOptionalArgs args) {
Collection<?> additionalFilters = args == null
? Collections.emptyList()
: args.additionalFilters;
EurekaJerseyClient providedJerseyClient = args == null
? null
: args.eurekaJerseyClient;
TransportClientFactories argsTransportClientFactories = null;
if (args != null && args.getTransportClientFactories() != null) {
argsTransportClientFactories = args.getTransportClientFactories();
}
// Ignore the raw types warnings since the client filter interface changed between jersey 1/2
@SuppressWarnings("rawtypes")
TransportClientFactories transportClientFactories = argsTransportClientFactories == null
? new Jersey1TransportClientFactories()
: argsTransportClientFactories;
// If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
// noinspection unchecked
eurekaTransport.transportClientFactory = providedJerseyClient == null
? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
: transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
// 初始化 应用解析器的应用实例数据源 TODO[0028]写入集群和读取集群
ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
@Override
public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
long delay = getLastSuccessfulRegistryFetchTimePeriod();
if (delay > thresholdInMs) {
logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
thresholdInMs, delay);
return null;
} else {
return localRegionApps.get();
}
}
};
// 创建 EndPoint 解析器
eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
clientConfig,
transportConfig,
eurekaTransport.transportClientFactory,
applicationInfoManager.getInfo(),
applicationsSource
);
if (clientConfig.shouldRegisterWithEureka()) {
EurekaHttpClientFactory newRegistrationClientFactory = null;
EurekaHttpClient newRegistrationClient = null;
try {
newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
transportConfig
);
newRegistrationClient = newRegistrationClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
eurekaTransport.registrationClient = newRegistrationClient;
}
// new method (resolve from primary servers for read)
// Configure new transport layer (candidate for injecting in the future)
if (clientConfig.shouldFetchRegistry()) {
EurekaHttpClientFactory newQueryClientFactory = null;
EurekaHttpClient newQueryClient = null;
try {
newQueryClientFactory = EurekaHttpClients.queryClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
clientConfig,
transportConfig,
applicationInfoManager.getInfo(),
applicationsSource
);
newQueryClient = newQueryClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
eurekaTransport.queryClientFactory = newQueryClientFactory;
eurekaTransport.queryClient = newQueryClient;
}
}
eureka的网络通信就介绍到这里了。