[HDFS] 文件系统的小文件判定和合并问题

2019-10-24  本文已影响0人  LZhan
1 判定是否有小文件存在
 public boolean isSmallFileExists(String location) throws IOException {
        Path path = new Path(location);
        if (fs.exists(path) && fs.isDirectory(path)) {
            List<Long> lenList = new LinkedList<>(); // 文件长度

            RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path);
            while (iterator.hasNext()) {
                LocatedFileStatus fileStatus = iterator.next();

                if (fileStatus.getPath().getName().startsWith("_")) continue; // 过滤 _SUCCESS 文件

                lenList.add(fileStatus.getLen());
            }

            long totalLen = lenList.stream().mapToLong(l -> l).sum(); // 总大小
            int fileNum = lenList.size(); // 文件数
            int preferedFileNum = (int) (totalLen / (getBlockSize() * 0.6) + 1); // 期望文件数

            return fileNum > preferedFileNum;
        }

        return false;
    }

 public long getBlockSize() {
        return fs.getConf().getLong("dfs.blocksize", 32 * 1024 * 1024);
    }

分析:
<1> 判定当前路径是否存在以及当前路径是目录而不是某具体文件。
<2> RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path)
即获取路径下的所有的文件和文件夹。(这里面应该是没有文件夹了,全是文件)
<3> 遍历文件,过滤_SUCCESS文件(_SUCCESS文件在什么情况下会生成?)
添加每个文件的文件大小。
<4> 计算该目录下全部文件大小,统计所有文件数,并计算期望文件数。
期望文件数的大小:
总文件大小/(文件系统块大小 * 0.6)+1
这里默认了存够了文件系统块大小的60%,就不算是小文件了
<5> 文件数大于期望文件数,代表当前目录是存在小文件的,否则不存在小文件。

2 合并小文件
@Component
public class StorageOptimizationJob implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(StorageOptimizationJob.class);

    @Autowired
    private StorageService storageService;

//    @Scheduled(fixedDelay = 1000 * 60 * 5)
    @Override
    public void run() {
        if (prepare()) { // 准备好了么?
            logger.info("准备就绪,执行优化");
            optimize(); // 优化
            logger.info("优化完毕,执行清理");
            cleanup(); // 清理
            logger.info("清理完毕,任务退出");
        } else {
            logger.info("准备未就绪,任务退出");
        }
    }

    private boolean prepare() {
        boolean prepare = true;

        try (YARN yarn = YARN.get()) {
            prepare = prepare || yarn.isIdle();
        } catch (Exception e) {
            logger.error("调用 YARN 出错", e);
            prepare = false;
        }

        return prepare;
    }

    private void optimize() {
        List<StorageJobDTO> jobs = this.storageService.queryAllStorageJobs();
        if (jobs == null || jobs.size() == 0) return;
        StorageJobDTO job = jobs.get((new Random()).nextInt(jobs.size())); // 随机获取一个任务

        try (HiveMetaStore hiveMetaStore = HiveMetaStore.get();HDFS hdfs = HDFS.get()) {
            if (hiveMetaStore.isTableExists(job.getDb(), job.getTable())) {
                if (hiveMetaStore.isSetPartition(job.getDb(), job.getTable())) {
                    // 设置了分区
                    List<String> locations = hiveMetaStore.getPartitionLocation(job.getDb(), job.getTable());
                    for (String location: locations) {
                        if (hdfs.isSmallFileExists(location)) {
                            logger.info("{} 存在小文件,执行合并任务", location);
                            new ConcatenateTask(location).run(); // 启动合并任务
                            log(job.getId(), StorageJobLogDO.STATE_SUCCESS, location + " 存在小文件,执行合并任务");
                            break; // 每个周期只合并一个目录
                        }
                    }
                } else {
                    // 没有设置分区
                    String location = hiveMetaStore.getTableLocation(job.getDb(), job.getTable());
                    if (hdfs.isSmallFileExists(location)) {
                        logger.info("{} 存在小文件,执行合并任务", location);
                        new ConcatenateTask(location).run(); // 启动合并任务
                        log(job.getId(), StorageJobLogDO.STATE_SUCCESS, location + " 存在小文件,执行合并任务");
                    }
                }
            } else {
                logger.warn("Hive 表 {}.{} 不存在", job.getDb(), job.getTable());
                log(job.getId(), StorageJobLogDO.STATE_FAILED, "Hive 表 " + job.getDb() + "." + job.getTable() + " 不存在");
            }
        } catch (Exception e) {
            logger.error("执行优化出错", e);
            log(job.getId(), StorageJobLogDO.STATE_FAILED, e.getLocalizedMessage());
        }
    }

    private void cleanup() {}

    private void log(Long jobId, String state, String desc) {
        StorageJobLogDO log = new StorageJobLogDO(jobId);
        log.setState(state);
        log.setDesc(desc);
        this.storageService.insertStorageJobLog(log);
    }

}
class ConcatenateTask {

    private static final Logger logger = LoggerFactory.getLogger(ConcatenateTask.class);

    private String location;

    private CountDownLatch cd;

    ConcatenateTask(String location) {
        this.location = location;
        this.cd = new CountDownLatch(1);
    }

    public void run() throws IOException, InterruptedException {
        String appPath = System.getenv("SPARK_APP_PATH");
        if (appPath == null || appPath.trim().length() == 0) {
            logger.error("没有配置 SPARK_APP_PATH 环境变量");
            throw new RuntimeException("没有配置 SPARK_APP_PATH 环境变量");
        }

        logger.info("启动 Spark 任务");
        SparkAppHandle handle = new SparkLauncher()
                .setMaster("yarn")
                .setDeployMode("client")
                .setConf("spark.executor.instances", "4")
                .setConf("spark.executor.cores", "2")
                .setConf("spark.executor.memory", "4g")
                .setMainClass("com.dyingbleed.labrador.spark.Application")
                .setAppResource(System.getenv("SPARK_APP_PATH"))
                .addAppArgs(location)
                .setAppName("labrador")
                .startApplication();

        handle.addListener(new SparkAppHandle.Listener() {
            @Override
            public void stateChanged(SparkAppHandle handle) {
                logger.info("Spark State: {}", handle.getState().name());
                if (handle.getState().isFinal()) cd.countDown();
            }

            @Override
            public void infoChanged(SparkAppHandle handle) {}
        });

        cd.await(10, TimeUnit.MINUTES); // 30 分钟超时
    }
}

核心代码 Spark任务合并小文件:

object Application {

  private val logger = LoggerFactory.getLogger(classOf[Application])

  def main(args: Array[String]): Unit = {
    if (args.length < 1) {
      throw new IllegalArgumentException("请传入至少一个参数")
    }
    val location = args(0)

    val spark = SparkSession.builder()
      .appName("labrador")
      .getOrCreate()
    try {
      new Application(spark, location).run()
    } catch {
      case e: Exception => logger.error("执行出错", e)
    } finally {
      spark.close()
    }
  }

}

class Application(spark: SparkSession, location: String) extends Runnable {

  override def run(): Unit = {
    val conf = new Configuration()
    val fs = FileSystem.get(conf)

    fileFormat(fs, location) match {
      case Some(fileFormat) => {
        Application.logger.info("获取文件格式 {}", fileFormat)

        val tmp = s"/tmp/labrador/${new Random().nextInt(Int.MaxValue)}" // 临时目录
        Application.logger.debug("临时目录 {}", tmp)

        Application.logger.info("执行小文件合并")
        try {
          spark.read
            .format(fileFormat)
            .load(location)
            .repartition(getPreferedPartitionNum(fs, location))
            .write
            .format(fileFormat)
            .save(tmp)
        } catch {
          case e: Exception => {
            val path = new Path(tmp)
            if (fs.exists(path)) {
              fs.delete(path, true)
            }
            throw e
          }
        }

        Application.logger.info("验证数据")
        if (validate(fileFormat, location, tmp)) {
          Application.logger.info("备份旧数据")
          fs.rename(new Path(location), new Path(location + "_bak"))
          Application.logger.info("拷贝新数据")
          fs.rename(new Path(tmp), new Path(location))
          Application.logger.info("删除旧数据")
          fs.delete(new Path(location + "_bak"), true)
        } else {
          Application.logger.warn("合并后数据不一致")
        }
      }
      case None => Application.logger.warn("无法获取 {} 下文件格式", location)
    }
  }

  private[this] def fileFormat(fs: FileSystem, location: String): Option[String] = {
    val files = fs.listFiles(new Path(location), false)

    while (files.hasNext) {
      val file = files.next()
      if (file.isFile) {
        val fileName = file.getPath.getName
        val lastIndex = fileName.lastIndexOf(".")
        if (lastIndex > 0 && lastIndex + 1 <= fileName.length) {
          val suffix = fileName.substring(lastIndex + 1, fileName.length)
          if (isValidFileFormat(suffix)) return Option(suffix)
        }
      }
    }

    None
  }

  private[this] def isValidFileFormat(fileFormat: String): Boolean = {
    fileFormat.equalsIgnoreCase("parquet") || fileFormat.equalsIgnoreCase("json")
  }

  private[this] def getPreferedPartitionNum(fs: FileSystem, location: String): Int = {
    val files = fs.listFiles(new Path(location), false)

    var totolLen = 0L // 总大小

    while (files.hasNext) {
      val file = files.next()
      if (file.isFile) totolLen += file.getLen
    }

    (totolLen / fs.getConf.getLong("dfs.blocksize", 32 * 1024 * 1024) + 1).toInt
  }

  private[this] def validate(fileFormat: String, sourceDir: String, targetDir: String): Boolean = {
    val sourceCount = spark.read.format(fileFormat).load(sourceDir).count()
    val targetCount = spark.read.format(fileFormat).load(targetDir).count()
    sourceCount == targetCount
  }

}

分析:
<1> prepare方法:
在准备优化合并小文件前,先判定yarn是否有足够的资源(即当前可用core数目以及当前可用内存),先去调用yarn,即初始化yarn,new YARN()并加载配置文件,isIdle()方法判定当前资源是否满足要求。
<2> optimize方法:
a.先去查看所有的优化任务,并随机获取1个任务
b.判断hive中是否存在对应库对应表
c.如果该表进行分区,即获取所有的分区存储路径,判断每个分区下是否存在小文件
d.在合并小文件的任务中,
第一步:判断文件格式,是parquet还是json格式的
第二步:针对读取到的格式利用repartition进行合并,也就是文件数的减少:

spark.read
.format(fileFormat)
.load(location)
.repartition(getPreferedPartitionNum(fs, location))
.write
.format(fileFormat)
.save(tmp)

e.合并完小文件后,验证旧数据和新数据(此刻新数据保存在临时目录下)是否一致,一致的话,就进行重命名旧数据(_bak),将新数据移动到旧数据目录,删除旧数据(_bak)。

上一篇下一篇

猜你喜欢

热点阅读