微服务注册及发现

2019-12-11  本文已影响0人  minute_5

微服务注册及发现

Netflix官方对于Eureka的架构图


eurekaArchitecture.png

这里简单解释一下这个图:

  1. 对于Eureka来说主要有两个角色:server, client
  2. client又分为两种
    1. service -- 提供服务
    2. client -- 消费服务
    3. 一个client既可以是消费者又可以是生产者
  3. 每个server之间组成集群 -- server之间进行相互注册
  4. 每个client向server注册,将自己的一些客户端信息发送Server,并通过心跳机制维持上线状态
  5. 图中的每一块区域(集群)都需要一个server提供注册服务
  6. client从server上拉取信息,缓存这些信息,然后根据这些信息去调用其他client进行消费

源码解析

如何进行服务注册? -- register

Client

  1. 启动后拉去服务端信息
  2. 开启心跳机制
  3. 发送自身信息到server注册
dicoverClient关联类图
    private void initScheduledTasks() {
        // 使用任务调度获取注册表信息
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        
        // 注册client到server
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // 开启心跳
            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            // 注册client到server上
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
            ···省略代码
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    
    // 细讲讲
    // InstanceInfoReplicator 实现 Runnable
    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

    // 注册方法
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            // 通过http请求向Server注册
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
    // 服务端提供的注册接口
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {    
        ···省略代码···
        // 这个register下面再说
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

server

  1. 启动后拉去服务端信息
  2. 开启心跳机制
  3. 发送自身信息到server注册
    // BootStrapContext是eureka server的初始启动类,其中调用initEurekaServerContext
    protected void initEurekaServerContext() throws Exception {
    ···省略···
    PeerAwareInstanceRegistry registry;
            if (isAws(applicationInfoManager.getInfo())) {
            ...
            } else {
                // 服务注册
                registry = new PeerAwareInstanceRegistryImpl(
                        eurekaServerConfig,
                        eurekaClient.getEurekaClientConfig(),
                        serverCodecs,
                        eurekaClient
                );
            }

            // 服务高可用
            PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                    registry,
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    applicationInfoManager
            );
    }

    // PeerAwareInstanceRegistryImpl
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // 通过父类AbstractInstanceRegistry进行注册
        // 其将注册列表信息放到Map<String, Lease<InstanceInfo>> gMap中
        // 修改instance状态
        super.register(info, leaseDuration, isReplication);
        // 同步到其他Eureka Server的其他Peers节点,遍历循环向所有的Peers节点注册
        // 该方法通过执行一个任务向其他节点同步该注册信息
        // 最终执行类PeerEurekaNodes的register()方法
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

    // PeerEurekaNode
    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        // 最终通过http注册
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

如何进行服务续约? -- reNew

Client

上面提到了开启心跳线程 HeartbeatThread

    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

    /**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

server

    // 服务端刷新心跳接口
    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 这里的renew就和上面服务端的register大同小异了
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
        ·······
    }

服务信息获取、下线、剔除都和上面两种方式都类似了

参考

https://github.com/Netflix/eureka/wiki

https://cloud.spring.io/spring-cloud-netflix/reference/html/

https://www.fangzhipeng.com/springcloud/2017/08/11/eureka-resources.html

上一篇下一篇

猜你喜欢

热点阅读