遥感卫星

Python+MRT 并行处理MODIS数据

2019-12-05  本文已影响0人  CapDragonfly

最近涉及到MODIS 植被指数、地表温度、蒸散等时序数据产品的处理,因此对MODIS数据批处理的方法进行了改进。主要是想充分利用工作站的性能,提升数据处理的效率。虽然用了两三天时间,但是”磨刀不误砍柴工“,还是值得的。

批处理工具MRT

MODIS数据批处理大多是依靠MRT工具,通过执行cmd命令的方式完成,需要提前手动生成相关产品处理的prm文件,可参考:

https://blog.csdn.net/jingchenlin1996/article/details/88548728

如果不同年份和月份的数据存放在不同的目录下,那么就涉及到了文件夹的遍历。我们选择了 python 的os.walk()函数,并通过os.popen()函数执行批处理的cmd命令。

#! usr/bin/python
#coding=utf-8
# 运行此文件前需要先运行MRT生成prm配置文件

import os
def MRTBacth(fileInpath,fileOutpath,prmFilepath):
    for dirPath, dirNames, fileNames in os.walk(fileInpath):
        if len(fileNames)> 0:
            strCMD = 'cd .. && C: && cd C:/MRT/bin &&java -jar MRTBatch.jar -d ' + dirPath + \
                   ' -p prmFilepath -o ' + fileOutpath + '&&MRTBatch.bat'
            res = os.popen(strCMD)
            print(res.read())
            print(fileOutpath +':       Finished')
MRTBacth('H:/DATA/MOD16A2/2001')

到这里都只是简单的单核计算。

并行计算

python并行计算可参考资料:

https://www.cnblogs.com/fugeny/p/9898971.html

以及基础的并发、并行、阻塞等概念,参考:

https://www.cnblogs.com/zhangyafei/p/9606765.html

Python+MRT 并行处理

遇见的问题

我们选择了multiprocessing的多进程包,在上面MRTBatch()函数的基础上增加下面的内容:

#导入multiprocessing包和functools包的partial模块
import multiprocessing as mp
from functools import partial
#windows环境下主程序应放在if __name__ =="__main__"下
if __name__ == "__main__":
    fileInpath = 'H:/DATA/MOD16A2/'
    fileOutpath='H:\DATA\RESULT\MOD16A2'
    prmFile='H:/DATA/MOD16A2.prm'
    #获取输入路径下的所有包含文件的文件路径
    fileList=GetFile(fileInpath)
    #创建10个进程的进程池,
    pool = mp.Pool(10)
    #因为MRTBactch()需要传入多参数,可通过partial()函数固定其他参数
    func=partial(MRTBacth,fileOutpath=fileOutpath,prmFile=prmFile)
    #通过pool.map()函数实现并行计算
    res=pool.map(func,fileList)
    print(res.get())
    pool.close()
    pool.join()

看似没什么问题,实际上计算的结果并不对。正常输出的tif文件数据量约180M,但此时程序算出来的数据量约1.7G,差不多10辈的差距。更意外的是只生成了一个TmpMosaic.hdf 的中间过程文件。

1.7G的输出结果,正常为180M

通过查看日志文件,发现虽然进程池中的进程都执行了mrtmosaic.exe和resample.exe,但是这俩exe在不同进程中的输入输出路径都是一样的。

输入都是该路径下的22个hdf,输出也是同样路径的tmpMosaic.hdf

后来发现当执行MRT批处理中“java -jar MRTBatch.jar -d ' + dirPath + -p prmFilepath -o ' + fileOutpath ”的命令时,MRT会在其安装目录下生成文件名为MRTBatch.bat的文件,文件内容如下(请忽略具体的输出输出路径):

mrtmosaic -s "1 0 0 0 0" -i H:\DATA\RESULT\MOD16A2/2001\353\MOD16A2.A2001353_mosaic.prm -o H:\DATA\RESULT\MOD16A2/2001\353\TmpMosaic.hdf
resample -p H:\DATA\RESULT\MOD16A2/2001\353\MOD16A2.A2001353_resample.prm
del H:\DATA\RESULT\MOD16A2/2001\353\TmpMosaic.hdf

原来MRTBatch.bat的文件记录的时后续调用mrtmosaic.exe和resample.exe所需的输入参数。这解释了为什么MRT批处理命令中要有”MRTBatch.bat“这一命令,这也解释了为什么不同进程的输出结果是相同的。因为每个进程都会在同一目录下生成MRTBatch.bat文件,文件会自动覆盖,最终只记录了最后一次的内容。

解决办法

改进的方法就是独立调用mrtmosaic.exe和resample.exe。通过查看MRT帮助文档,了解了两个exe的参数,对程序进行了调试和修改。最终的程序代码如下:

#! usr/bin/python
#coding=utf-8
# 运行此文件前需要先运行MRT生成prm配置文件

import os,shutil,time,re
import multiprocessing as mp
from functools import partial

#文件夹遍历,获取包含hdf文件的所有文件夹,并以字符串列表返回
def GetFile(fileInpath):
   fileList=[]
   for dirPath, dirNames, fileNames in os.walk(fileInpath):
       if len(fileNames) > 0:
           fileList.append(dirPath)
   return fileList
#获取某文件夹下所有后缀为hdf的文件名,以字符串列表返回
def GetFileName(fileInpath,fileType):
   fileNames = os.listdir(fileInpath)
   fileNameArr = []
   for fileName in fileNames:
       if os.path.splitext(fileName)[1] == fileType:
           fileNameArr.append(fileName)
   return fileNameArr

#mrtmosaic.exe
def MrtmosaicPro(fileInpath,fileOutpath,SpectralSubset):
   #build mosaic prm file,which records all the datafiles under the fileInpath
   fileNames = GetFileName(fileInpath, '.hdf')
   prmName = fileOutpath + '/' + 'tmpMosaic.prm'
   for fileName in fileNames:
       with open(prmName, 'a+') as f:
           f.write(fileInpath+'/'+fileName + '\n')

   #get the product type and  DOY information
   productType=fileNames[0].split('.')[0]
   DOY=fileNames[0].split('.')[1]
   fileOutName=fileOutpath +'/'+productType+'.'+DOY+'.'+'TmpMosaic.hdf'
   #run mrtmosaic.exe
   strCMD = 'cd .. && C: && cd C:/MRT/bin && mrtmosaic -i '+prmName+ ' -s "'+SpectralSubset+'" -o '+fileOutName
   res = os.popen(strCMD)
   print(res.read())
   print(fileOutName +':       Finished')
   return fileOutName,prmName

#fileInpath:MOD16A2.A2010001.tmpmosaic.hdf
#fileOutpath:
#prmFilepath:
#SpectralSubset:'1 0 0 0 0'
def ResamplePro(fileInpath,fileOutpath,prmFilepath,SpectralSubset):
   #copy the prm file into fileoutpath and update it
   shutil.copy(prmFilepath,fileOutpath)
   filedata=[]
   newPrmFilepath=fileOutpath+'/'+os.path.basename(prmFilepath)
   fileOutName=os.path.basename(fileInpath).split('.')[0]+'.'+os.path.basename(fileInpath).split('.')[1]+'.tif'
   targetStr1='INPUT_FILENAME = '
   newStr1='INPUT_FILENAME = '+fileInpath
   targetStr2='SPECTRAL_SUBSET = '
   newStr2 = 'SPECTRAL_SUBSET = (' + str(SpectralSubset.count('1'))+')'
   targetStr3 = 'OUTPUT_FILENAME = '
   newStr3 = 'OUTPUT_FILENAME = ' + fileOutpath+'/'+fileOutName

   #read in the prmfile and replace the variables of input filename,SPECTRAL_SUBSET and OUTPUT_FILENAME
   with open(newPrmFilepath, "r") as file:
       #lines=file.read()
       for line in file:
           if re.match(targetStr1,line):
               line=newStr1
           if re.match(targetStr2,line):
               line=newStr2
           if re.match(targetStr3,line):
               line=newStr3
           filedata.append(line)

   with open(newPrmFilepath,'a+') as file:
       file.seek(0)
       file.truncate()
       for i in range(len(filedata)):
           file.write(filedata[i]+'\n')
   #run resample.exe
   strCMD = 'cd .. && C: && cd C:/MRT/bin && resample -p ' +newPrmFilepath
   res = os.popen(strCMD)
   print(res.read())
   print(fileOutName + ':       Finished')
   return newPrmFilepath

def MRTBacth(fileInpath,fileOutpath,prmFile,SpectralSubset):
   #根据实际情况自己改
   newFileOutpath=fileOutpath+fileInpath[15:24]
   if os.path.exists(newFileOutpath) == False:
       os.makedirs(newFileOutpath)

   result= MrtmosaicPro(fileInpath, newFileOutpath, SpectralSubset)
   #use the output from mrtmosaic.exe as the inputfile to resample.exe
   newPrmFilepath=ResamplePro(result[0], newFileOutpath, prmFile, SpectralSubset)
   #delete prm file and tmpmosaic.hdf
   strDelCMD='del ' + result[0] + '&& del ' + result[1]+'&& del '+newPrmFilepath
   res = os.popen(strDelCMD)

if __name__ == "__main__":
   startTime = time.time()
   fileInpath = 'H:/DATA/MOD16A2/'
   fileOutpath='H:\DATA\RESULT\MOD16A2'
   prmFile='H:/DATA/MOD16A2.prm'
   SpectralSubset="1 0 0 0 0"
   fileList=GetFile(fileInpath)

   #MRTBacth(fileList[0],fileOutpath,prmFile,SpectralSubset)
   # cores number
   cores = mp.cpu_count()
   pool = mp.Pool(int(cores * 2 / 3))

   func=partial(MRTBacth,fileOutpath=fileOutpath,prmFile=prmFile,SpectralSubset=SpectralSubset)
   res=pool.map(func,fileList)
   print(res.get())
   pool.close()
   pool.join()
   endTime = time.time()
   print("used time is ", endTime - startTime)

总结

以前使用MRT没有注意到程序运行的一些细节,通过这次实验,了解了MRT内部运行的机制。机器的内存和显卡资源还未利用起来,后续将对程序进一步优化和改进。最后放一张cpu的图:


image.png
上一篇下一篇

猜你喜欢

热点阅读