[HDFS] 文件系统的小文件判定和合并问题
1 判定是否有小文件存在
public boolean isSmallFileExists(String location) throws IOException {
Path path = new Path(location);
if (fs.exists(path) && fs.isDirectory(path)) {
List<Long> lenList = new LinkedList<>(); // 文件长度
RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path);
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
if (fileStatus.getPath().getName().startsWith("_")) continue; // 过滤 _SUCCESS 文件
lenList.add(fileStatus.getLen());
}
long totalLen = lenList.stream().mapToLong(l -> l).sum(); // 总大小
int fileNum = lenList.size(); // 文件数
int preferedFileNum = (int) (totalLen / (getBlockSize() * 0.6) + 1); // 期望文件数
return fileNum > preferedFileNum;
}
return false;
}
public long getBlockSize() {
return fs.getConf().getLong("dfs.blocksize", 32 * 1024 * 1024);
}
分析:
<1> 判定当前路径是否存在以及当前路径是目录而不是某具体文件。
<2> RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path)
即获取路径下的所有的文件和文件夹。(这里面应该是没有文件夹了,全是文件)
<3> 遍历文件,过滤_SUCCESS文件(_SUCCESS文件在什么情况下会生成?)
添加每个文件的文件大小。
<4> 计算该目录下全部文件大小,统计所有文件数,并计算期望文件数。
期望文件数的大小:
总文件大小/(文件系统块大小 * 0.6)+1
这里默认了存够了文件系统块大小的60%,就不算是小文件了
<5> 文件数大于期望文件数,代表当前目录是存在小文件的,否则不存在小文件。
2 合并小文件
@Component
public class StorageOptimizationJob implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StorageOptimizationJob.class);
@Autowired
private StorageService storageService;
// @Scheduled(fixedDelay = 1000 * 60 * 5)
@Override
public void run() {
if (prepare()) { // 准备好了么?
logger.info("准备就绪,执行优化");
optimize(); // 优化
logger.info("优化完毕,执行清理");
cleanup(); // 清理
logger.info("清理完毕,任务退出");
} else {
logger.info("准备未就绪,任务退出");
}
}
private boolean prepare() {
boolean prepare = true;
try (YARN yarn = YARN.get()) {
prepare = prepare || yarn.isIdle();
} catch (Exception e) {
logger.error("调用 YARN 出错", e);
prepare = false;
}
return prepare;
}
private void optimize() {
List<StorageJobDTO> jobs = this.storageService.queryAllStorageJobs();
if (jobs == null || jobs.size() == 0) return;
StorageJobDTO job = jobs.get((new Random()).nextInt(jobs.size())); // 随机获取一个任务
try (HiveMetaStore hiveMetaStore = HiveMetaStore.get();HDFS hdfs = HDFS.get()) {
if (hiveMetaStore.isTableExists(job.getDb(), job.getTable())) {
if (hiveMetaStore.isSetPartition(job.getDb(), job.getTable())) {
// 设置了分区
List<String> locations = hiveMetaStore.getPartitionLocation(job.getDb(), job.getTable());
for (String location: locations) {
if (hdfs.isSmallFileExists(location)) {
logger.info("{} 存在小文件,执行合并任务", location);
new ConcatenateTask(location).run(); // 启动合并任务
log(job.getId(), StorageJobLogDO.STATE_SUCCESS, location + " 存在小文件,执行合并任务");
break; // 每个周期只合并一个目录
}
}
} else {
// 没有设置分区
String location = hiveMetaStore.getTableLocation(job.getDb(), job.getTable());
if (hdfs.isSmallFileExists(location)) {
logger.info("{} 存在小文件,执行合并任务", location);
new ConcatenateTask(location).run(); // 启动合并任务
log(job.getId(), StorageJobLogDO.STATE_SUCCESS, location + " 存在小文件,执行合并任务");
}
}
} else {
logger.warn("Hive 表 {}.{} 不存在", job.getDb(), job.getTable());
log(job.getId(), StorageJobLogDO.STATE_FAILED, "Hive 表 " + job.getDb() + "." + job.getTable() + " 不存在");
}
} catch (Exception e) {
logger.error("执行优化出错", e);
log(job.getId(), StorageJobLogDO.STATE_FAILED, e.getLocalizedMessage());
}
}
private void cleanup() {}
private void log(Long jobId, String state, String desc) {
StorageJobLogDO log = new StorageJobLogDO(jobId);
log.setState(state);
log.setDesc(desc);
this.storageService.insertStorageJobLog(log);
}
}
class ConcatenateTask {
private static final Logger logger = LoggerFactory.getLogger(ConcatenateTask.class);
private String location;
private CountDownLatch cd;
ConcatenateTask(String location) {
this.location = location;
this.cd = new CountDownLatch(1);
}
public void run() throws IOException, InterruptedException {
String appPath = System.getenv("SPARK_APP_PATH");
if (appPath == null || appPath.trim().length() == 0) {
logger.error("没有配置 SPARK_APP_PATH 环境变量");
throw new RuntimeException("没有配置 SPARK_APP_PATH 环境变量");
}
logger.info("启动 Spark 任务");
SparkAppHandle handle = new SparkLauncher()
.setMaster("yarn")
.setDeployMode("client")
.setConf("spark.executor.instances", "4")
.setConf("spark.executor.cores", "2")
.setConf("spark.executor.memory", "4g")
.setMainClass("com.dyingbleed.labrador.spark.Application")
.setAppResource(System.getenv("SPARK_APP_PATH"))
.addAppArgs(location)
.setAppName("labrador")
.startApplication();
handle.addListener(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
logger.info("Spark State: {}", handle.getState().name());
if (handle.getState().isFinal()) cd.countDown();
}
@Override
public void infoChanged(SparkAppHandle handle) {}
});
cd.await(10, TimeUnit.MINUTES); // 30 分钟超时
}
}
核心代码 Spark任务合并小文件:
object Application {
private val logger = LoggerFactory.getLogger(classOf[Application])
def main(args: Array[String]): Unit = {
if (args.length < 1) {
throw new IllegalArgumentException("请传入至少一个参数")
}
val location = args(0)
val spark = SparkSession.builder()
.appName("labrador")
.getOrCreate()
try {
new Application(spark, location).run()
} catch {
case e: Exception => logger.error("执行出错", e)
} finally {
spark.close()
}
}
}
class Application(spark: SparkSession, location: String) extends Runnable {
override def run(): Unit = {
val conf = new Configuration()
val fs = FileSystem.get(conf)
fileFormat(fs, location) match {
case Some(fileFormat) => {
Application.logger.info("获取文件格式 {}", fileFormat)
val tmp = s"/tmp/labrador/${new Random().nextInt(Int.MaxValue)}" // 临时目录
Application.logger.debug("临时目录 {}", tmp)
Application.logger.info("执行小文件合并")
try {
spark.read
.format(fileFormat)
.load(location)
.repartition(getPreferedPartitionNum(fs, location))
.write
.format(fileFormat)
.save(tmp)
} catch {
case e: Exception => {
val path = new Path(tmp)
if (fs.exists(path)) {
fs.delete(path, true)
}
throw e
}
}
Application.logger.info("验证数据")
if (validate(fileFormat, location, tmp)) {
Application.logger.info("备份旧数据")
fs.rename(new Path(location), new Path(location + "_bak"))
Application.logger.info("拷贝新数据")
fs.rename(new Path(tmp), new Path(location))
Application.logger.info("删除旧数据")
fs.delete(new Path(location + "_bak"), true)
} else {
Application.logger.warn("合并后数据不一致")
}
}
case None => Application.logger.warn("无法获取 {} 下文件格式", location)
}
}
private[this] def fileFormat(fs: FileSystem, location: String): Option[String] = {
val files = fs.listFiles(new Path(location), false)
while (files.hasNext) {
val file = files.next()
if (file.isFile) {
val fileName = file.getPath.getName
val lastIndex = fileName.lastIndexOf(".")
if (lastIndex > 0 && lastIndex + 1 <= fileName.length) {
val suffix = fileName.substring(lastIndex + 1, fileName.length)
if (isValidFileFormat(suffix)) return Option(suffix)
}
}
}
None
}
private[this] def isValidFileFormat(fileFormat: String): Boolean = {
fileFormat.equalsIgnoreCase("parquet") || fileFormat.equalsIgnoreCase("json")
}
private[this] def getPreferedPartitionNum(fs: FileSystem, location: String): Int = {
val files = fs.listFiles(new Path(location), false)
var totolLen = 0L // 总大小
while (files.hasNext) {
val file = files.next()
if (file.isFile) totolLen += file.getLen
}
(totolLen / fs.getConf.getLong("dfs.blocksize", 32 * 1024 * 1024) + 1).toInt
}
private[this] def validate(fileFormat: String, sourceDir: String, targetDir: String): Boolean = {
val sourceCount = spark.read.format(fileFormat).load(sourceDir).count()
val targetCount = spark.read.format(fileFormat).load(targetDir).count()
sourceCount == targetCount
}
}
分析:
<1> prepare方法:
在准备优化合并小文件前,先判定yarn是否有足够的资源(即当前可用core数目以及当前可用内存),先去调用yarn,即初始化yarn,new YARN()并加载配置文件,isIdle()方法判定当前资源是否满足要求。
<2> optimize方法:
a.先去查看所有的优化任务,并随机获取1个任务
b.判断hive中是否存在对应库对应表
c.如果该表进行分区,即获取所有的分区存储路径,判断每个分区下是否存在小文件
d.在合并小文件的任务中,
第一步:判断文件格式,是parquet还是json格式的
第二步:针对读取到的格式利用repartition进行合并,也就是文件数的减少:
spark.read
.format(fileFormat)
.load(location)
.repartition(getPreferedPartitionNum(fs, location))
.write
.format(fileFormat)
.save(tmp)
e.合并完小文件后,验证旧数据和新数据(此刻新数据保存在临时目录下)是否一致,一致的话,就进行重命名旧数据(_bak),将新数据移动到旧数据目录,删除旧数据(_bak)。