Spring cloud euraka

2020-07-24  本文已影响0人  奋斗的韭菜汪

常用的服务注册中心:euraka、Nacos、Zookeeper、Etcd、Consul
比较注册中心差异从这些点出发:
1、服务动态感知实现方式(push/pull),2、存储(是否支持持久化),3、高可用机制(集群特性:是否支持选主、一致性问题),4cap特性(cp、ap)
,5api的提供形式(http协议、netty通信)
延伸:cap解释
c(强一致性)
a(可用性)
p(分区容器性)(要满足这一点必须是集群节点/跨区域高可用)
为什么只能满足cp或者ap
在满足p的情况下,例如集群节点,比如一个请求,为了实现强一致性,则必须让请求等待,等待情况下,则不满足可用性,反之同理说明ap。
Euraka:非持久化存储基于内存,(ap)最终一致性、集群节点角色是一样的
Nacos:一致性raft算法(redis-sentinael/nacos选举)long-poll
Zookeeper:zab协议 push
Consul:一致性raft算法(redis-sentinael/nacos选举)、long-poll
Etcd:一致性raft算法(redis-sentinael/nacos选举)long-poll
服务注册中心的作用:1、服务上下线动态感知(根据心跳的结果剔除服务) 2、服务调用维护方便
消费者服务获取euraka服务列表的方式:euraka注册中心push主动推送,pull定时轮询拉取(存在延迟)
延伸:long-pull(长轮询):客户端发送请求到服务端,服务端hold住请求,等待,直到需要响应时才返回响应

Eureka Server
Eureka Client
Eureka自我保护机制
心跳失败的比例15分钟之内,高于15%的节点,Eureka任务这个服务出现了故障,直接剔除这个问题服务;
这样做的好处:避免网络抖动或者网络不稳定的情况下,误删了服务
Eureka自我保护机制的实现原理
AbstractInstanceRegistry
protected volatile int numberOfRenewsPerMinThreshold; //每一分钟最小的续约数量
protected volatile int expectedNumberOfClientsSendingRenews; //预期每分钟收到的续约的客户端数量

配置eureka心跳失败服务不可用评判标准(默认为0.85)

eureka.server.renewal-percent-threshold: 0.5

源码分析的核心功能
1、Eureka Server如何接收请求?
server端必须实现监听,可以是http+容器(tomcat,jobss)的实现方式,也可以是socket(NIO,Netty)的方式
Eureka Server服务访问的类ApplicationsResource/ApplicationResource(类似于controller角色),使用的jersey框架
Jersey是一个RESTFUL请求服务JAVA框架
2、Eureka Client如何注册?
核心方法EurekaServiceRegistry->register
spring的SmartLifecycle接口,实现这个接口重写start方法(可以在spring容器启动后执行start方法内容)
DisCoveryClient.register -> AbstractJerseyEurekaHttpClient.register

    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

客户端发送的请求最终会到eureka服务端 ApplicationResource.addInstance内,新增注册实例
3、Eureka Server如何存储服务地址?
请求到达服务端ApplicationResource.addInstance内,会调用

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
       //先注册到一个服务节点
        super.register(info, leaseDuration, isReplication);
        //再复制到其他节点完成完成注册
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

调用父类的register方法中可以看到服务实例信息InstanceInfo会缓存在ConcurrentHashMap中
Eureka三级缓存(实现读写分离,解决竞争提高性能)


image.png

服务端节点间地址复制(通过调用node节点的register完成复制)

    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        } finally {
            CurrentRequestVersion.remove();
        }
    }

4、Eureka Client如何查询地址?
(1)DiscoveryClient构造方法中会触发
(2)每三十秒执行一次的定时任务DiscoveryClient中cacheRefreshTask
cacheRefreshTask中的衰减任务实现:第一次是30s执行,若失败,第二次60s重试,若再失败180秒重试

        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

TimedSupervisorTask

    @Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();

            long currentDelay = delay.get();
            //延时时间重计算
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }

            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }

            throwableCounter.increment();
        } finally {
            if (future != null) {
                future.cancel(true);
            }

            if (!scheduler.isShutdown()) {
               //执行定时任务
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
上一篇下一篇

猜你喜欢

热点阅读