k8s环境下的spring boot应用的优雅启停

2022-11-29  本文已影响0人  天草二十六_简村人

一、背景

服务之前都是部署在ECS的虚拟机中,最近都转移部署到k8s容器,在发布的过程中,发现请求容易访问到不健康的节点。所以,这里需要梳理下我们的访问链路,找到问题,明确我们的目标。

问题是Pod是running状态,并不代表服务已健康,如果这时候就让请求访问进来,就会出现上面的问题。我们需告知pod健康的依据是什么,得等到pod是就绪状态了,才接受外部请求。

本文主要讲述k8s下,该怎么做到优雅停机和启动pod,末尾会总结下还遗留的未解决的问题。

二、目标

三、总体设计

3.1、服务调用关系

这里的网关,也可以是api-six等,作为ingress的角色。

image.png

3.2、滚动发布流程

先启新版本的Pod,确保健康后,再停掉旧版本的Pod,以此类推。当然,发布过程中,会有新旧版本同时运行,所以称之为滚动更新。如果新版本的pod启动失败,则不会继续往后走,确保节点的正常服务能力。

image.png

四、Pod和容器

4.1、Pod的生命周期

pod状态.jpg

4.2、容器的状态

容器状态.jpg

4.3、Pod的探针Probe

Kubernetes为检查应用状态定义了三种探针,它们分别对应容器不同的状态:

这三种探针是递进的关系:应用程序先启动,加载完配置文件等基本的初始化数 据就进入了Startup状态,之后如果没有什么异常就是Liveness存活状态,但可能有一些准备 工作没有完成,还不一定能对外提供服务,只有到最后的Readiness状态才是一个容器最健康 可用的状态。

image.png

探测状态可以使用Shell、TCP Socket、HTTP Get三种方式,还可以调整探测的频率和超时时间等参数。

总结:我们在yml中使用探针的示例:

containers:
        - env:
            - name: TZ
              value: Asia/Shanghai
          image: >-
            xxx/smarterclass:1.2.1
          imagePullPolicy: Always
# 优雅停机,执行kill命令,而不是kill -9
# do_stop.sh是自己封装的一个命令
          lifecycle:
            preStop:
              exec:
                command:
                  - /bin/sh
                  - '-c'
                  - >-
                    wget http://127.0.0.1:54199/offline 2>/tmp/null;sleep 45 &&
                    /opt/xxx/smarterclass/bin/do_stop.sh
          name: air-smarterclass
          ports:
            - containerPort: 9025
              protocol: TCP

          readinessProbe:
            failureThreshold: 3
            httpGet:
              path: /mgm/health
              port: 9025
              scheme: HTTP
            initialDelaySeconds: 1
            periodSeconds: 5
            successThreshold: 1
            timeoutSeconds: 3
     
          startupProbe:
# 连续探测失败几次才认为是真正发生了异常,默认是3次
            failureThreshold: 3
            exec:
              command:["cat","/opt/xxx/smarterclass/pid"]
# 执行探测动作的时间间隔,默认是10秒探测一次
            periodSeconds: 3
            successThreshold: 1
            timeoutSeconds: 1

          livenessProbe:
            failureThreshold: 3
            tcpSocket:
              port: 9025
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 1

五、应用的优雅停机

    import org.apache.catalina.connector.Connector;

    private volatile Connector connector;

    if (this.connector != null) {
           # 断开连接
            this.connector.pause();

            Executor tomcatExecutor = this.connector.getProtocolHandler().getExecutor();
            if (tomcatExecutor instanceof ExecutorService) {
               # 关闭线程池
                XxxExecutorServiceShutdownHooks.add((ExecutorService) tomcatExecutor);
            }
        }
    private DiscoveryRegistration registration;
        
    this.registration.deregister();

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.context.ApplicationContext;

import java.util.Collection;
import java.util.Map;

public void shutdown(ApplicationContext context) {
        log.info("begin shutdown RabbitMQ");

        RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry = context.getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                RabbitListenerEndpointRegistry.class);

        Collection<MessageListenerContainer> containers = rabbitListenerEndpointRegistry.getListenerContainers();

        for (MessageListenerContainer messageListenerContainer : containers) {
            if (messageListenerContainer.isRunning()) {
                messageListenerContainer.stop();
            }
        }

        log.info("finish shutdown RabbitMQ");
    }
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

public class XxxExecutorServiceShutdownHooks {
    private static HashSet<ExecutorService> executorServiceHashSet = new HashSet<>(10);

    /**
     * 添加需要gracefulShutdown的ExecutorService
     *
     * @param hook
     */
    public static synchronized void add(ExecutorService hook) {
        if (hook != null) {
            executorServiceHashSet.add(hook);
        }
    }

    /**
     * shutdown
     *
     * @param shutdownAwaitTermination
     */
    public static void shutdownGraceful(long shutdownAwaitTermination) {
        List<ExecutorService> executorServices = new ArrayList<>(executorServiceHashSet);
        CompletableFuture<Void> future = ThreadPoolUtils.shutdownGraceful(executorServices, shutdownAwaitTermination);
        if (future != null) {
            future.join();
        }
    }
}

六、遗留问题

// 首次任务延迟1秒
    private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;

// 每30秒刷新
    private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;

public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
// 静态内部类
private static class LazyHolder {
        private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
        private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
        private static Thread _shutdownThread;

        static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;

        static {
            int coreSize = poolSizeProp.get();
            ThreadFactory factory = (new ThreadFactoryBuilder())
                    .setNameFormat("PollingServerListUpdater-%d")
                    .setDaemon(true)
                    .build();
            _serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
            poolSizeProp.addCallback(new Runnable() {
                @Override
                public void run() {
                    _serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
                }

            });
            _shutdownThread = new Thread(new Runnable() {
                public void run() {
                    logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
                    shutdownExecutorPool();
                }
            });
// 定义钩子_shutdownThread,并加入
            Runtime.getRuntime().addShutdownHook(_shutdownThread);
        }

        private static void shutdownExecutorPool() {
            if (_serverListRefreshExecutor != null) {
                _serverListRefreshExecutor.shutdown();

// 防止循环,这里将_shutdownThread钩子进行移除
                if (_shutdownThread != null) {
                    try {
                        Runtime.getRuntime().removeShutdownHook(_shutdownThread);
                    } catch (IllegalStateException ise) { // NOPMD
                        // this can happen if we're in the middle of a real
                        // shutdown,
                        // and that's 'ok'
                    }
                }

            }
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读