玩转大数据大数据Spark On K8S

Spark on k8s: OkHttp WebSocket 非

2019-09-25  本文已影响0人  Kent_Yao

问题描述

基于Spark 3.0-SNAPSHOT(unreleased),做Spark-Terasort相关测试,任务正常的话分如下图所示两个stage,

spark terasort job

第一个,stage 0,读取hdfs input目录数据,并进行shuffle write
第二个,stage 1,进行shuffle read,并向hdfs output目录的输出

其中一次测试由于hdfs存储的配额不足,导致stage 1失败,fail 整个spark job,如下图所示。

spark terasort job error

此时按照Spark on k8s正常的逻辑,会执行到SparkContext.stop, 各类线程该停停该关关,各executor进程应该收到exit的命令,然后做完这些,主线程退出,留给JVM收尾最后Driver 进程停止。当然Driver pod会留给k8s去进行垃圾回收。

然而在实际的情况下,却发现整个Spark作业依然占着k8s集群的资源,Driver pod状态一直处于running的状态。


image.png

在client侧自然也无法获得该作业的“实际状态”


image.png

在这种情况下,Spark on k8s作业就无法像类似Spark on yarn的作业,不依靠一些额外的监控手段才能感知app的运行状态。

测试的过程中,模拟了各种失败的场景,stage/job级别的异常基本上,都让app卡死了.

原因分析

分析Spark on k8s作业的异常,和其他调度器(yarn等)作业也基本一致,各进程的日志信息,jstack信息,gc信息等,还可以通过Spark UI 获取一些作业相关的信息。区别点在于Spark on k8s可能还需要看下各个Pod的状态等相关信息

  1. 首先查看Driver Pod状态,未见异常


    driver pod status
  2. 查看Driver/executor进程jstack


    driver jstack
executor jstack

果不其然,driver 进程中 DestoryJavaJVM被一个OkHttp WebSocket...非daemon线程给拦住了去路...

根据线程的名字可以猜到这个driver端启动的k8s api server 通信的client有关

翻翻Spark 源码

  1. Driver侧的 k8s client有定义关闭自己的逻辑
  2. Spark 在初始化OkHttpClient的时候把ping interval设置为0,
val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
      .withApiVersion("v1")
      .withMasterUrl(master)
      .withWebsocketPingInterval(0)
      .withRequestTimeout(clientType.requestTimeout(sparkConf))
      .withConnectionTimeout(clientType.connectionTimeout(sparkConf))
      .withOption(oauthTokenValue) {
        (token, configBuilder) => configBuilder.withOauthToken(token)
      }.withOption(oauthTokenFile) {
        (file, configBuilder) =>
            configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
      }.withOption(caCertFile) {
        (file, configBuilder) => configBuilder.withCaCertFile(file)
      }.withOption(clientKeyFile) {
        (file, configBuilder) => configBuilder.withClientKeyFile(file)
      }.withOption(clientCertFile) {
        (file, configBuilder) => configBuilder.withClientCertFile(file)
      }.withOption(namespace) {
        (ns, configBuilder) => configBuilder.withNamespace(ns)
      }.build()

而OkHttpClient 中 pingIntervalMillis 为0时,这个线程并不会被调度。。

public void initReaderAndWriter(
      String name, long pingIntervalMillis, Streams streams) throws IOException {
    synchronized (this) {
      this.streams = streams;
      this.writer = new WebSocketWriter(streams.client, streams.sink, random);
      this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
      if (pingIntervalMillis != 0) {
        executor.scheduleAtFixedRate(
            new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
      }
      if (!messageAndCloseQueue.isEmpty()) {
        runWriter(); // Send messages that were enqueued before we were connected.
      }
    }

    reader = new WebSocketReader(streams.client, streams.source, this);
  }

开始凌乱了。。。

既然定义了自己关闭的逻辑,没有正常的关闭,想必是有没法捕获的系统异常发生,比如OOM

但是第2点就没法解释了,jar包冲突,不兼容?尝试把okhttp的构件升级到kubernetes-client的依赖版本(3.12.0)依然没有用。。

解决办法

https://issues.apache.org/jira/browse/SPARK-27927
https://issues.apache.org/jira/browse/SPARK-27812
目前社区对这个问题有两个类似的issue跟踪,其中也不乏一些尝试,不乏一些原因的猜测,但貌似都没找到根源。

可以尝试的方法,

  1. 回退kubernete-client到3.0版本
  2. 加上 -XX:OnOutOfMemoryError="kill -9 %p"来跑,可以规避一些driver oom导致的问题
  3. 使用UncaughtExceptionHandler
  4. 调用System.exit
上一篇下一篇

猜你喜欢

热点阅读