微服务注册及发现
2019-12-11 本文已影响0人
minute_5
微服务注册及发现
Netflix官方对于Eureka的架构图
eurekaArchitecture.png
这里简单解释一下这个图:
- 对于Eureka来说主要有两个角色:server, client
- client又分为两种
- service -- 提供服务
- client -- 消费服务
- 一个client既可以是消费者又可以是生产者
- 每个server之间组成集群 -- server之间进行相互注册
- 每个client向server注册,将自己的一些客户端信息发送Server,并通过心跳机制维持上线状态
- 图中的每一块区域(集群)都需要一个server提供注册服务
- client从server上拉取信息,缓存这些信息,然后根据这些信息去调用其他client进行消费
- 这里先说说 Eureka 的自我保护模式:
- 当一个新的Eureka Server出现时,它尝试从相邻节点获取所有实例注册表信息。如果获取信息出现问题,Eureka Serve会尝试其他的节点。如果服务器能够成功获取所有实例,则根据该信息设置应该接收的更新阈值。如果有任何时间,Eureka Serve接收到的续约低于为该值配置的百分比(默认为15分钟内低于85%),则服务器开启自我保护模式,即不再剔除注册列表的信息。这样做的好处就是,如果是Eureka Server自身的网络问题,导致Eureka Client的续约不上,Eureka Client的注册列表信息不再被删除,也就是Eureka Client还可以被其他服务消费。
源码解析
如何进行服务注册? -- register
Client
- 启动后拉去服务端信息
- 开启心跳机制
- 发送自身信息到server注册
private void initScheduledTasks() {
// 使用任务调度获取注册表信息
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 注册client到server
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// 开启心跳
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 注册client到server上
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
···省略代码
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
// 细讲讲
// InstanceInfoReplicator 实现 Runnable
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
// 注册方法
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
// 通过http请求向Server注册
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
// 服务端提供的注册接口
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
···省略代码···
// 这个register下面再说
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
server
- 启动后拉去服务端信息
- 开启心跳机制
- 发送自身信息到server注册
// BootStrapContext是eureka server的初始启动类,其中调用initEurekaServerContext
protected void initEurekaServerContext() throws Exception {
···省略···
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
...
} else {
// 服务注册
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
// 服务高可用
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
}
// PeerAwareInstanceRegistryImpl
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 通过父类AbstractInstanceRegistry进行注册
// 其将注册列表信息放到Map<String, Lease<InstanceInfo>> gMap中
// 修改instance状态
super.register(info, leaseDuration, isReplication);
// 同步到其他Eureka Server的其他Peers节点,遍历循环向所有的Peers节点注册
// 该方法通过执行一个任务向其他节点同步该注册信息
// 最终执行类PeerEurekaNodes的register()方法
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
// PeerEurekaNode
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
// 最终通过http注册
return replicationClient.register(info);
}
},
expiryTime
);
}
如何进行服务续约? -- reNew
Client
上面提到了开启心跳线程 HeartbeatThread
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
server
// 服务端刷新心跳接口
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 这里的renew就和上面服务端的register大同小异了
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
·······
}
服务信息获取、下线、剔除都和上面两种方式都类似了
参考
https://github.com/Netflix/eureka/wiki
https://cloud.spring.io/spring-cloud-netflix/reference/html/
https://www.fangzhipeng.com/springcloud/2017/08/11/eureka-resources.html