Flink on Zeppelin问题四则(并没有优雅的解决方案

2021-07-30  本文已影响0人  LittleMagic

前言

最近我们正式调研Zeppelin作为Flink SQL开发套件的可能性,于是clone了最新的Zeppelin v0.10-SNAPSHOT源码,自行编译并部署到了预发布环境的新Flink集群中。Flink版本为1.13.0,Hadoop版本为CDH 6.3.2自带的3.0.0。经过两天的探索,发现了一些问题,在百忙之中抽出点时间简要记录一下并不成功的troubleshooting过程。

Flink Interpreter不加载

安装好Zeppelin并配置好Flink Interpreter的各项参数之后(采用生产环境推荐的Flink on YARN + Interpreter on YARN + Isolated Per Note模式),编写Note无法执行,提示找不到FlinkInterpreter类,如下图所示。

排查:

但是,Interpreter进程的classpath中并没有zeppelin/interpreter/flink/*,自然无法加载Interpreter了。为什么会这样?来到负责启动Interpreter的bin/interpreter.sh文件,第125行:

INTERPRETER_ID=$(basename "${INTERPRETER_DIR}")
if [[ "${INTERPRETER_ID}" != "flink" ]]; then
  # don't add interpreter jar for flink, FlinkInterpreterLauncher will choose the right interpreter jar based
  # on scala version of current FLINK_HOME.
  addJarInDirForIntp "${INTERPRETER_DIR}"
fi

可见这里对Flink做了一个特殊的处理。根据注释的描述,FlinkInterpreterLauncher会根据用户的Flink版本选择对应Scala版本的JAR包。查看该类的源码,确实如此(有一个chooseFlinkAppJar()方法,略去)。然而继续向上追踪FlinkInterpreterLauncher的调用链,发现它并没有在任何与YARN有关的方法中被使用,也就是说上面选择JAR包的动作根本没发生。

由于我们仍然仅使用基于Scala 2.11的Flink,故可以将目录中的2.12包删掉,并修改interpreter.sh注释掉if语句,问题临时解决。更好的解决方法是将上述的选择JAR包逻辑写入YARN Launcher内,但侵入性较大,留待今后操作。

YARN Application模式无效

根据文档描述,YARN Application模式与普通的YARN模式相比会更节省资源,因为JobManager和Interpreter跑在一个Container内,如下图所示。

我们确认与Hadoop相关的各项参数、环境变量都设置好之后,将Note的flink.execution.mode参数改为yarn-application,运行之,报出如下异常。

对比一下上节贴出的Interpreter临时目录结构,容易发现这里的路径是错的。来到FlinkScalaInterpreter类,将flinkHomeflinkConfDirhiveConfDir做如下的修改。

mode = ExecutionMode.withName(
  properties.getProperty("flink.execution.mode", "LOCAL")
    .replace("-", "_")
    .toUpperCase)
if (mode == ExecutionMode.YARN_APPLICATION) {
  if (flinkVersion.isFlink110) {
    throw new Exception("yarn-application mode is only supported after Flink 1.11")
  }
  // use current yarn container working directory as FLINK_HOME, FLINK_CONF_DIR and HIVE_CONF_DIR
  val workingDirectory = new File(".").getAbsolutePath
  flinkHome = workingDirectory + "/flink"
  flinkConfDir = workingDirectory + "/flink/conf"
  hiveConfDir = workingDirectory + "/hive_conf"
}

重新编译打包并替换掉原来的Interpreter包,再次执行,又报出如下异常,提示Application ID为空。

话休絮烦,直接贴出对应的源码:

val (effectiveConfig, cluster) = fetchConnectionInfo(config, configuration, flinkShims)
this.configuration = effectiveConfig
cluster match {
  case Some(clusterClient) =>
    // local mode or yarn
    if (mode == ExecutionMode.LOCAL) {
      LOGGER.info("Starting FlinkCluster in local mode")
      this.jmWebUrl = clusterClient.getWebInterfaceURL
      this.displayedJMWebUrl = this.jmWebUrl
    } else if (mode == ExecutionMode.YARN) {
      LOGGER.info("Starting FlinkCluster in yarn mode")
      this.jmWebUrl = clusterClient.getWebInterfaceURL
      val yarnAppId = HadoopUtils.getYarnAppId(clusterClient)
      this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
    } else {
      throw new Exception("Starting FlinkCluster in invalid mode: " + mode)
    }
  case None =>
    // yarn-application mode
    if (mode == ExecutionMode.YARN_APPLICATION) {
      // get yarnAppId from env `_APP_ID`
      val yarnAppId = System.getenv("_APP_ID")
      LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
      this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
      this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
    } else {
      LOGGER.info("Use FlinkCluster in remote mode")
      this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
      this.displayedJMWebUrl = getDisplayedJMWebUrl("")
    }
}

@Internal
def fetchConnectionInfo(
    config: Config,
    flinkConfig: Configuration,
    flinkShims: FlinkShims): (Configuration, Option[ClusterClient[_]]) = {
  config.executionMode match {
    case ExecutionMode.LOCAL => createLocalClusterAndConfig(flinkConfig)
    case ExecutionMode.REMOTE => createRemoteConfig(config, flinkConfig)
    case ExecutionMode.YARN => createYarnClusterIfNeededAndGetConfig(config, flinkConfig, flinkShims)
    case ExecutionMode.YARN_APPLICATION => (flinkConfig, None)
    case ExecutionMode.UNDEFINED => // Wrong input
      throw new IllegalArgumentException("please specify execution mode:\n" +
        "[local | remote <host> <port> | yarn | yarn-application ]")
  }
}

上面的代码有些令人迷惑:为什么YARN Application模式下没有做任何操作,只是返回了一个空的ClusterClient?另外,_APP_ID是Flink ApplicationMaster启动时设置的环境变量,这样操作一定可以拿得到么?

当然这个问题比较复杂,笔者也尚未认真研究过YARN Application模式相关的源码,需要时间来处理。但可以肯定至少在我们的环境下,需要做较大的改动才能让它正常使用。在完全解决之前,仍然采用传统YARN模式也无伤大雅。

配置Note只读权限后无法切换视图

为了保证安全,我们强制新建的Note都为私有(即Reader、Writer、Runner、Owner初始值都是用户自己),然后按需对相关同学开放权限。

一般情况下,所有人都可以读Note。但是只将Reader权限放开后,除Owner之外的人看到的都是白板。这是因为Note对只读权限者变成了report视图,只能看到结果,不展示SQL源码,如下图所示。

但是,如果尝试切换成default视图,就会提示需要Writer权限才可以:

这就有些匪夷所思了。将Zeppelin日志等级设为DEBUG,重复切换视图操作,可以发现在NotebookServer的事件循环里产生了NOTE_UPDATE事件。

DEBUG [2021-07-28 19:03:52,097] ({qtp306612792-317} NotebookServer.java[onMessage]:255) - RECEIVE: NOTE_UPDATE, RECEIVE PRINCIPAL: bigdata_dev, RECEIVE TICKET: f9118802-14cd-40fc-8e60-caeb0267aac2, RECEIVE ROLES: ["role1"], RECEIVE DATA: {id=2GE65N3RS, name=WorkflowAliBinlog, config={isZeppelinNotebookCronEnable=false, looknfeel=default, personalizedMode=false}}
 WARN [2021-07-28 19:03:52,098] ({qtp306612792-317} SimpleServiceCallback.java[onFailure]:50) - HTTP 403 Forbidden

这是因为Note的视图风格直接存储在.zpln文件内(叫做looknfeel),所以修改它就相当于修改Note了 = =

将NOTE_UPDATE的权限赋给Reader显然不现实,考虑到我们几乎不会用到simple和report视图,将simple视图作为只读的情况比较合适。

但是,来到zeppelin-web项目下之后,发现代码只读性、代码编辑器的可见性和视图之间的耦合过紧,改了数十处HTML和JS代码之后仍然未能达到想要的效果。经过试验,只读用户还是可以看到非HEAD commit的代码的,切换版本凑合也能用,此事暂时搁置。

Zeppelin日志被"Saving note"信息淹没

我们采用的Notebook Repo是GitNotebookRepo(本地)+FileSystemNotebookRepo(远程HDFS)的组合。启动了几个Flink SQL任务之后,在Zeppelin日志中看到如下格式的信息刷屏。

INFO [2021-07-30 19:32:10,160] ({pool-11-thread-16} VFSNotebookRepo.java[save]:144) - Saving note 2GDVAVC4W to etl-mq/analytics_access_log_app_2GDVAVC4W.zpln

这是因为在每个Note对应作业的JobManager中,都会启动一个名为FlinkJobProgressPoller的线程,以zeppelin.flink.job.check_interval的间隔(默认1秒,我们改成了5秒)检查并更新任务的状态。如上一节所述,这些信息也都保存在.zpln文件内,所以会导致频繁写文件。并且这个线程做的事情非常多,代码如下所示。

@Override
public void run() {
  while (!Thread.currentThread().isInterrupted() && running.get()) {
    JsonNode rootNode = null;
    try {
      synchronized (running) {
        running.wait(checkInterval);
      }
      rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
              .asJson().getBody();
      JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
      int totalTasks = 0;
      int finishedTasks = 0;
      for (int i = 0; i < vertices.length(); ++i) {
        JSONObject vertex = vertices.getJSONObject(i);
        totalTasks += vertex.getInt("parallelism");
        finishedTasks += vertex.getJSONObject("tasks").getInt("FINISHED");
      }
      LOGGER.debug("Total tasks:" + totalTasks);
      LOGGER.debug("Finished tasks:" + finishedTasks);
      if (finishedTasks != 0) {
        this.progress = finishedTasks * 100 / totalTasks;
        LOGGER.debug("Progress: " + this.progress);
      }
      String jobState = rootNode.getObject().getString("state");
      if (jobState.equalsIgnoreCase("finished")) {
        break;
      }

      long duration = rootNode.getObject().getLong("duration") / 1000;
      if (isStreamingInsertInto) {
        if (isFirstPoll) {
          StringBuilder builder = new StringBuilder("%angular ");
          builder.append("<h1>Duration: {{duration}} </h1>");
          builder.append("\n%text ");
          context.out.clear(false);
          context.out.write(builder.toString());
          context.out.flush();
          isFirstPoll = false;
        }
        context.getAngularObjectRegistry().add("duration",
                toRichTimeDuration(duration),
                context.getNoteId(),
                context.getParagraphId());
      }

      // fetch checkpoints info and save the latest checkpoint into paragraph's config.
      rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString() + "/checkpoints")
              .asJson().getBody();
      if (rootNode.getObject().has("latest")) {
        JSONObject latestObject = rootNode.getObject().getJSONObject("latest");
        if (latestObject.has("completed") && latestObject.get("completed") instanceof JSONObject) {
          JSONObject completedObject = latestObject.getJSONObject("completed");
          if (completedObject.has("external_path")) {
            String checkpointPath = completedObject.getString("external_path");
            LOGGER.debug("Latest checkpoint path: {}", checkpointPath);
            if (!StringUtils.isBlank(checkpointPath) && !checkpointPath.equals(latestCheckpointPath)
              Map<String, String> config = new HashMap<>();
              config.put(LATEST_CHECKPOINT_PATH, checkpointPath);
              context.getIntpEventClient().updateParagraphConfig(
                      context.getNoteId(), context.getParagraphId(), config);
              latestCheckpointPath = checkpointPath;
            }
          }
        }
      }
    } catch (Exception e) {
      LOGGER.error("Fail to poll flink job progress via rest api", e);
    }
  }
}

考虑到直接对它下手的复杂度,目前只能暂时在log4j配置中屏蔽掉VFSNotebookRepo的INFO日志输出。随着今后任务增多,会继续评估Zeppelin Server和磁盘的压力,并尽可能寻找优化的方法。

The End

其实还有个Maven Repo解析与添加Nexus私服认证方面的问题,但这个更复杂,并且与Flink无关,就不废话了。

民那周末快乐~

上一篇下一篇

猜你喜欢

热点阅读