Python+MRT 并行处理MODIS数据
最近涉及到MODIS 植被指数、地表温度、蒸散等时序数据产品的处理,因此对MODIS数据批处理的方法进行了改进。主要是想充分利用工作站的性能,提升数据处理的效率。虽然用了两三天时间,但是”磨刀不误砍柴工“,还是值得的。
批处理工具MRT
MODIS数据批处理大多是依靠MRT工具,通过执行cmd命令的方式完成,需要提前手动生成相关产品处理的prm文件,可参考:
https://blog.csdn.net/jingchenlin1996/article/details/88548728
如果不同年份和月份的数据存放在不同的目录下,那么就涉及到了文件夹的遍历。我们选择了 python 的os.walk()函数,并通过os.popen()函数执行批处理的cmd命令。
#! usr/bin/python
#coding=utf-8
# 运行此文件前需要先运行MRT生成prm配置文件
import os
def MRTBacth(fileInpath,fileOutpath,prmFilepath):
for dirPath, dirNames, fileNames in os.walk(fileInpath):
if len(fileNames)> 0:
strCMD = 'cd .. && C: && cd C:/MRT/bin &&java -jar MRTBatch.jar -d ' + dirPath + \
' -p prmFilepath -o ' + fileOutpath + '&&MRTBatch.bat'
res = os.popen(strCMD)
print(res.read())
print(fileOutpath +': Finished')
MRTBacth('H:/DATA/MOD16A2/2001')
到这里都只是简单的单核计算。
并行计算
python并行计算可参考资料:
以及基础的并发、并行、阻塞等概念,参考:
Python+MRT 并行处理
遇见的问题
我们选择了multiprocessing的多进程包,在上面MRTBatch()函数的基础上增加下面的内容:
#导入multiprocessing包和functools包的partial模块
import multiprocessing as mp
from functools import partial
#windows环境下主程序应放在if __name__ =="__main__"下
if __name__ == "__main__":
fileInpath = 'H:/DATA/MOD16A2/'
fileOutpath='H:\DATA\RESULT\MOD16A2'
prmFile='H:/DATA/MOD16A2.prm'
#获取输入路径下的所有包含文件的文件路径
fileList=GetFile(fileInpath)
#创建10个进程的进程池,
pool = mp.Pool(10)
#因为MRTBactch()需要传入多参数,可通过partial()函数固定其他参数
func=partial(MRTBacth,fileOutpath=fileOutpath,prmFile=prmFile)
#通过pool.map()函数实现并行计算
res=pool.map(func,fileList)
print(res.get())
pool.close()
pool.join()
看似没什么问题,实际上计算的结果并不对。正常输出的tif文件数据量约180M,但此时程序算出来的数据量约1.7G,差不多10辈的差距。更意外的是只生成了一个TmpMosaic.hdf 的中间过程文件。
1.7G的输出结果,正常为180M通过查看日志文件,发现虽然进程池中的进程都执行了mrtmosaic.exe和resample.exe,但是这俩exe在不同进程中的输入输出路径都是一样的。
输入都是该路径下的22个hdf,输出也是同样路径的tmpMosaic.hdf后来发现当执行MRT批处理中“java -jar MRTBatch.jar -d ' + dirPath + -p prmFilepath -o ' + fileOutpath ”的命令时,MRT会在其安装目录下生成文件名为MRTBatch.bat的文件,文件内容如下(请忽略具体的输出输出路径):
mrtmosaic -s "1 0 0 0 0" -i H:\DATA\RESULT\MOD16A2/2001\353\MOD16A2.A2001353_mosaic.prm -o H:\DATA\RESULT\MOD16A2/2001\353\TmpMosaic.hdf
resample -p H:\DATA\RESULT\MOD16A2/2001\353\MOD16A2.A2001353_resample.prm
del H:\DATA\RESULT\MOD16A2/2001\353\TmpMosaic.hdf
原来MRTBatch.bat的文件记录的时后续调用mrtmosaic.exe和resample.exe所需的输入参数。这解释了为什么MRT批处理命令中要有”MRTBatch.bat“这一命令,这也解释了为什么不同进程的输出结果是相同的。因为每个进程都会在同一目录下生成MRTBatch.bat文件,文件会自动覆盖,最终只记录了最后一次的内容。
解决办法
改进的方法就是独立调用mrtmosaic.exe和resample.exe。通过查看MRT帮助文档,了解了两个exe的参数,对程序进行了调试和修改。最终的程序代码如下:
#! usr/bin/python
#coding=utf-8
# 运行此文件前需要先运行MRT生成prm配置文件
import os,shutil,time,re
import multiprocessing as mp
from functools import partial
#文件夹遍历,获取包含hdf文件的所有文件夹,并以字符串列表返回
def GetFile(fileInpath):
fileList=[]
for dirPath, dirNames, fileNames in os.walk(fileInpath):
if len(fileNames) > 0:
fileList.append(dirPath)
return fileList
#获取某文件夹下所有后缀为hdf的文件名,以字符串列表返回
def GetFileName(fileInpath,fileType):
fileNames = os.listdir(fileInpath)
fileNameArr = []
for fileName in fileNames:
if os.path.splitext(fileName)[1] == fileType:
fileNameArr.append(fileName)
return fileNameArr
#mrtmosaic.exe
def MrtmosaicPro(fileInpath,fileOutpath,SpectralSubset):
#build mosaic prm file,which records all the datafiles under the fileInpath
fileNames = GetFileName(fileInpath, '.hdf')
prmName = fileOutpath + '/' + 'tmpMosaic.prm'
for fileName in fileNames:
with open(prmName, 'a+') as f:
f.write(fileInpath+'/'+fileName + '\n')
#get the product type and DOY information
productType=fileNames[0].split('.')[0]
DOY=fileNames[0].split('.')[1]
fileOutName=fileOutpath +'/'+productType+'.'+DOY+'.'+'TmpMosaic.hdf'
#run mrtmosaic.exe
strCMD = 'cd .. && C: && cd C:/MRT/bin && mrtmosaic -i '+prmName+ ' -s "'+SpectralSubset+'" -o '+fileOutName
res = os.popen(strCMD)
print(res.read())
print(fileOutName +': Finished')
return fileOutName,prmName
#fileInpath:MOD16A2.A2010001.tmpmosaic.hdf
#fileOutpath:
#prmFilepath:
#SpectralSubset:'1 0 0 0 0'
def ResamplePro(fileInpath,fileOutpath,prmFilepath,SpectralSubset):
#copy the prm file into fileoutpath and update it
shutil.copy(prmFilepath,fileOutpath)
filedata=[]
newPrmFilepath=fileOutpath+'/'+os.path.basename(prmFilepath)
fileOutName=os.path.basename(fileInpath).split('.')[0]+'.'+os.path.basename(fileInpath).split('.')[1]+'.tif'
targetStr1='INPUT_FILENAME = '
newStr1='INPUT_FILENAME = '+fileInpath
targetStr2='SPECTRAL_SUBSET = '
newStr2 = 'SPECTRAL_SUBSET = (' + str(SpectralSubset.count('1'))+')'
targetStr3 = 'OUTPUT_FILENAME = '
newStr3 = 'OUTPUT_FILENAME = ' + fileOutpath+'/'+fileOutName
#read in the prmfile and replace the variables of input filename,SPECTRAL_SUBSET and OUTPUT_FILENAME
with open(newPrmFilepath, "r") as file:
#lines=file.read()
for line in file:
if re.match(targetStr1,line):
line=newStr1
if re.match(targetStr2,line):
line=newStr2
if re.match(targetStr3,line):
line=newStr3
filedata.append(line)
with open(newPrmFilepath,'a+') as file:
file.seek(0)
file.truncate()
for i in range(len(filedata)):
file.write(filedata[i]+'\n')
#run resample.exe
strCMD = 'cd .. && C: && cd C:/MRT/bin && resample -p ' +newPrmFilepath
res = os.popen(strCMD)
print(res.read())
print(fileOutName + ': Finished')
return newPrmFilepath
def MRTBacth(fileInpath,fileOutpath,prmFile,SpectralSubset):
#根据实际情况自己改
newFileOutpath=fileOutpath+fileInpath[15:24]
if os.path.exists(newFileOutpath) == False:
os.makedirs(newFileOutpath)
result= MrtmosaicPro(fileInpath, newFileOutpath, SpectralSubset)
#use the output from mrtmosaic.exe as the inputfile to resample.exe
newPrmFilepath=ResamplePro(result[0], newFileOutpath, prmFile, SpectralSubset)
#delete prm file and tmpmosaic.hdf
strDelCMD='del ' + result[0] + '&& del ' + result[1]+'&& del '+newPrmFilepath
res = os.popen(strDelCMD)
if __name__ == "__main__":
startTime = time.time()
fileInpath = 'H:/DATA/MOD16A2/'
fileOutpath='H:\DATA\RESULT\MOD16A2'
prmFile='H:/DATA/MOD16A2.prm'
SpectralSubset="1 0 0 0 0"
fileList=GetFile(fileInpath)
#MRTBacth(fileList[0],fileOutpath,prmFile,SpectralSubset)
# cores number
cores = mp.cpu_count()
pool = mp.Pool(int(cores * 2 / 3))
func=partial(MRTBacth,fileOutpath=fileOutpath,prmFile=prmFile,SpectralSubset=SpectralSubset)
res=pool.map(func,fileList)
print(res.get())
pool.close()
pool.join()
endTime = time.time()
print("used time is ", endTime - startTime)
总结
以前使用MRT没有注意到程序运行的一些细节,通过这次实验,了解了MRT内部运行的机制。机器的内存和显卡资源还未利用起来,后续将对程序进一步优化和改进。最后放一张cpu的图:
image.png