原创-spark sql 写入hive较慢原因分析

2019-04-29  本文已影响0人  无色的叶

问题现象

hbase表30000条数据,使用spark读取hbase数据,按照某一字段值进行分区,分区数在1000个,写入到hive分区表时,耗时较长,大概耗时在25分钟

 spark.sql("insert into legend.test_log_hive partition(name_par) select rowKey,name,age,mobile,addr,name as name_par from result")

根据执行日志分析,可确定耗时在load数据到hive阶段

spark load执行过程分析

spark.sql("insert into table") 对应spark类InsertIntoHiveTable中的run方法

case class InsertIntoHiveTable(
    table: CatalogTable,
    partition: Map[String, Option[String]],
    query: LogicalPlan,
    overwrite: Boolean,
    ifPartitionNotExists: Boolean,
    outputColumnNames: Seq[String]) extends SaveAsHiveFile {

  /**
   * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
   * `org.apache.hadoop.hive.serde2.SerDe` and the
   * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
   */
  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
    val externalCatalog = sparkSession.sharedState.externalCatalog
    val hadoopConf = sparkSession.sessionState.newHadoopConf()

    val hiveQlTable = HiveClientImpl.toHiveTable(table)
    // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
    // instances within the closure, since Serializer is not serializable while TableDesc is.
    val tableDesc = new TableDesc(
      hiveQlTable.getInputFormatClass,
      // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
      // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
      // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
      // HiveSequenceFileOutputFormat.
      hiveQlTable.getOutputFormatClass,
      hiveQlTable.getMetadata
    )
    val tableLocation = hiveQlTable.getDataLocation
    val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)

    try {
    //主要方法
      processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child)
    } finally {
      // Attempt to delete the staging directory and the inclusive files. If failed, the files are
      // expected to be dropped at the normal termination of VM since deleteOnExit is used.
      deleteExternalTmpPath(hadoopConf)
    }

    // un-cache this table.
    sparkSession.catalog.uncacheTable(table.identifier.quotedString)
    sparkSession.sessionState.catalog.refreshTable(table.identifier)

    CommandUtils.updateTableStats(sparkSession, table)

    // It would be nice to just return the childRdd unchanged so insert operations could be chained,
    // however for now we return an empty list to simplify compatibility checks with hive, which
    // does not return anything for insert operations.
    // TODO: implement hive compatibility as rules.
    Seq.empty[Row]
  }

insert overwrite 执行分为三步,一个是select,一个是write,一个是load,前边两步没什么问题,主要是最后一步load,以loadPartition为例看下执行过程:

override def loadPartition(
      db: String,
      table: String,
      loadPath: String,
      partition: TablePartitionSpec,
      isOverwrite: Boolean,
      inheritTableSpecs: Boolean,
      isSrcLocal: Boolean): Unit = withClient {
    requireTableExists(db, table)

    val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
    getTable(db, table).partitionColumnNames.foreach { colName =>
      // Hive metastore is not case preserving and keeps partition columns with lower cased names,
      // and Hive will validate the column names in partition spec to make sure they are partition
      // columns. Here we Lowercase the column names before passing the partition spec to Hive
      // client, to satisfy Hive.
      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
    }

    client.loadPartition(
      loadPath,
      db,
      table,
      orderedPartitionSpec,
      isOverwrite,
      inheritTableSpecs,
      isSrcLocal)
  }

接着调用org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition

def loadPartition(
      loadPath: String,
      dbName: String,
      tableName: String,
      partSpec: java.util.LinkedHashMap[String, String],
      replace: Boolean,
      inheritTableSpecs: Boolean,
      isSrcLocal: Boolean): Unit = withHiveState {
    val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
    shim.loadPartition(
      client,
      new Path(loadPath), // TODO: Use URI
      s"$dbName.$tableName",
      partSpec,
      replace,
      inheritTableSpecs,
      isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
      isSrcLocal = isSrcLocal)
  }

注意代码中shim.loadPartition,shim默认对应于版本1.2.1

private val shim = version match {
    case hive.v12 => new Shim_v0_12()
    case hive.v13 => new Shim_v0_13()
    case hive.v14 => new Shim_v0_14()
    case hive.v1_0 => new Shim_v1_0()
    case hive.v1_1 => new Shim_v1_1()
    case hive.v1_2 => new Shim_v1_2()
    case hive.v2_0 => new Shim_v2_0()
    case hive.v2_1 => new Shim_v2_1()
    case hive.v2_2 => new Shim_v2_2()
    case hive.v2_3 => new Shim_v2_3()
  }

则对应于对象new Shim_v1_2().loadPartition

override def loadPartition(
      hive: Hive,
      loadPath: Path,
      tableName: String,
      partSpec: JMap[String, String],
      replace: Boolean,
      inheritTableSpecs: Boolean,
      isSkewedStoreAsSubdir: Boolean,
      isSrcLocal: Boolean): Unit = {
    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
      holdDDLTime, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
      isSrcLocal: JBoolean, isAcid)
  }


private lazy val loadPartitionMethod =
    findMethod(
      classOf[Hive],
      "loadPartition",
      classOf[Path],
      classOf[String],
      classOf[JMap[String, String]],
      JBoolean.TYPE,
      JBoolean.TYPE,
      JBoolean.TYPE,
      JBoolean.TYPE,
      JBoolean.TYPE,
      JBoolean.TYPE)

反射调用hive的类org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(版本是1.2.1)
hive在执行loadPartition的时候,如果分区目录已经存在,会调用replaceFiles,replaceFiles会调用trashFilesUnderDir,trashFilesUnderDir里会逐个将文件放到回收站;不存在则直接创建相应目录及copy文件,其中trashFilesUnderDir方法是for循环一个一个进行删除,在文件数比较多时会比较慢

public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf) throws FileNotFoundException, IOException {
        FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER);
        boolean result = true;
        FileStatus[] arr$ = statuses;
        int len$ = statuses.length;

        for(int i$ = 0; i$ < len$; ++i$) {
            FileStatus status = arr$[i$];
            result &= moveToTrash(fs, status.getPath(), conf);
        }

        return result;
    }

对比hive-3.1.1源码发现

protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
          boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
          boolean isNeedRecycle, boolean isManaged) throws HiveException {
    try {

      FileSystem destFs = destf.getFileSystem(conf);
      // check if srcf contains nested sub-directories
      FileStatus[] srcs;
      FileSystem srcFs;
      try {
        srcFs = srcf.getFileSystem(conf);
        srcs = srcFs.globStatus(srcf);
      } catch (IOException e) {
        throw new HiveException("Getting globStatus " + srcf.toString(), e);
      }
      if (srcs == null) {
        LOG.info("No sources specified to move: " + srcf);
        return;
      }

      if (oldPath != null) {
        deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isNeedRecycle);
      }

      // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
      // destf
      boolean destfExist = FileUtils.mkdir(destFs, destf, conf);
      if(!destfExist) {
        throw new IOException("Directory " + destf.toString()
            + " does not exist and could not be created.");
      }

//接着调用deleteOldPathForReplace方法

 private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
      PathFilter pathFilter, boolean isNeedRecycle) throws HiveException {
    Utilities.FILE_OP_LOGGER.debug("Deleting old paths for replace in " + destPath
        + " and old path " + oldPath);
    boolean isOldPathUnderDestf = false;
    try {
      FileSystem oldFs = oldPath.getFileSystem(conf);
      FileSystem destFs = destPath.getFileSystem(conf);
      // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
      // existing content might result in incorrect (extra) data.
      // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
      // not the destf or its subdir?
      isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
      if (isOldPathUnderDestf) {
        cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge, isNeedRecycle);
      }
    } catch (IOException e) {
      if (isOldPathUnderDestf) {
        // if oldPath is a subdir of destf but it could not be cleaned
        throw new HiveException("Directory " + oldPath.toString()
            + " could not be cleaned up.", e);
      } else {
        //swallow the exception since it won't affect the final result
        LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
      }
    }

接着调用cleanUpOneDirectoryForReplace方法

private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs,
      PathFilter pathFilter, HiveConf conf, boolean purge, boolean isNeedRecycle) throws IOException, HiveException {
    if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
      recycleDirToCmPath(path, purge);
    }
    FileStatus[] statuses = fs.listStatus(path, pathFilter);
    if (statuses == null || statuses.length == 0) {
      return;
    }
    if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
      String s = "Deleting files under " + path + " for replace: ";
      for (FileStatus file : statuses) {
        s += file.getPath().getName() + ", ";
      }
      Utilities.FILE_OP_LOGGER.trace(s);
    }

    if (!trashFiles(fs, statuses, conf, purge)) {
      throw new HiveException("Old path " + path + " has not been cleaned up.");
    }
  }

最后调用trashFiles

/**
   * Trashes or deletes all files under a directory. Leaves the directory as is.
   * @param fs FileSystem to use
   * @param statuses fileStatuses of files to be deleted
   * @param conf hive configuration
   * @return true if deletion successful
   * @throws IOException
   */
  public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses,
      final Configuration conf, final boolean purge)
      throws IOException {
    boolean result = true;

    if (statuses == null || statuses.length == 0) {
      return false;
    }
    final List<Future<Boolean>> futures = new LinkedList<>();
    final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
        Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null;
    final SessionState parentSession = SessionState.get();
    for (final FileStatus status : statuses) {
      if (null == pool) {
        result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
      } else {
        futures.add(pool.submit(new Callable<Boolean>() {
          @Override
          public Boolean call() throws Exception {
            SessionState.setCurrentSessionState(parentSession);
            return FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
          }
        }));
      }
    }
    if (null != pool) {
      pool.shutdown();
      for (Future<Boolean> future : futures) {
        try {
          result &= future.get();
        } catch (InterruptedException | ExecutionException e) {
          LOG.error("Failed to delete: ",e);
          pool.shutdownNow();
          throw new IOException(e);
        }
      }
    }
    return result;
  }

可看到该trash方法中使用了多线程对文件进行处理,其实再查看如果文件不存在执行copyFiles流程方法,也使用了线程池进行文件copy从而提高了文件拷贝速度

调用链路


spark sql load执行链路.png
上一篇下一篇

猜你喜欢

热点阅读