MapReduce统计各个城市遥感影像上植被像素与非植被像素比例
2020-09-23 本文已影响0人
hehehehe
Mapper
mapper 读取一幅影像中的植被和植被像素存在PlantWritable 中,key为城市
package cn.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import cn.bean.GetKeyOfCity;
import cn.bean.Plant;
import cn.gdal.GdalOp;
import com.neunn.ne.hdfs.HdfsOperator;
public class MapperPlant extends Mapper<LongWritable , Text, Text, PlantWritable>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String hdfsPath = value.toString();
//"/home/kjxx/block/GF1/PMS1/20150904/0001020088/GF1_PMS1_130323331003_20150904_0001020088.TIFF"
String fileName = hdfsPath.substring(hdfsPath.length()-46, hdfsPath.length());
String localPath = "/apptar";
//将在hdfs读取到的路径,下载到本地
HdfsOperator.getFileFromHdfs(hdfsPath, localPath);
//
GdalOp gdalOp = new GdalOp();
String svmPath = "";
String classifySaveHdfsPath = "";
Plant plant = gdalOp.gdalOp(localPath+"/"+fileName, svmPath, classifySaveHdfsPath);
//传到Reducer中的Value值
PlantWritable plantWritable = new PlantWritable(plant.getPlant(), plant.getNoPlant());
//传到Reducer中的Key值
String cityName = GetKeyOfCity.getKeyOfImage(fileName);
Text Reducekey =new Text(cityName);
//写操作
context.write(Reducekey, plantWritable);
}
}
ReducerPlant 对城市key进行聚合
package cn.mr;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ReducerPlant extends Reducer<Text, PlantWritable, Text, Text> {
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<PlantWritable> values,
Reducer<Text, PlantWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
int plantCount = 0;
int noPlantCount = 0;
double percentPlant = 0;
for (PlantWritable val : values) {
plantCount += Integer.parseInt(val.plant);
noPlantCount += Integer.parseInt(val.noPlant);
}
percentPlant = ((float)(plantCount))/(plantCount + noPlantCount);
result.set("植被数目:"+ plantCount + "总数目:" + (plantCount + noPlantCount) + "覆盖率:" + percentPlant);
context.write(key, result);
}
}
PlantWritable
package cn.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class PlantWritable implements WritableComparable<Object> {
public String plant;
public String noPlant;
public PlantWritable(String plant, String noPlant){
this.noPlant = noPlant;
this.plant = plant;
}
@Override
public void readFields(DataInput in) throws IOException {
this.plant = in.readUTF();
this.noPlant = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.plant);
out.writeUTF(this.noPlant);
}
public String toString() {
return "植被像元: " + this.plant + " 非植被像元: " + this.noPlant;
}
@Override
public int compareTo(Object o) {
PlantWritable other = (PlantWritable) o;
int n = this.plant.compareTo(other.plant);
if(n != 0){
return n;
}
n = this.noPlant.compareTo(other.noPlant);
return n;
}
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof PlantWritable) {
PlantWritable other = (PlantWritable) obj;
return this.strEquals(this.plant, other.plant)
&& this.strEquals(this.noPlant, other.noPlant);
}
return false;
}
/**
* 重写 hashCode() 方法很重要,Hadoop 的 Partitioners 会用到这个方法
*/
public int hashCode() {
return 13 * (this.plant == null ? 0 : this.plant.hashCode())
+ 67 * (this.noPlant == null ? 0 : this.noPlant.hashCode());
}
public boolean strEquals(String a, String b) {
return (a == null && b == null) || (a != null && a.equals(b));
}
}
RunnerPlant
package cn.mr;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
public class RunnerPlant {
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
JobConf conf = new JobConf();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(RunnerPlant.class);
job.setMapperClass(MapperPlant.class);
job.setReducerClass(ReducerPlant.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths( conf, "");
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
gdalOp
package cn.gdal;
import java.io.IOException;
import libsvm.svm;
import libsvm.svm_model;
import libsvm.svm_node;
import org.gdal.gdal.Dataset;
import org.gdal.gdal.Driver;
import org.gdal.gdal.gdal;
import org.gdal.gdalconst.gdalconstConstants;
import org.gdal.ogr.ogr;
import cn.bean.Plant;
import com.neunn.ne.hdfs.HdfsOperator;
public class GdalOp {
/**
* 将影像进行分类,保存分类后的影像到HDFS中,并返回植被像元数和非植被像元
* 数组成的类
* @param fileName_tif 要处理影像路径
* @param svmPath 模型路径
* @param classifySaveHdfsPath 经分类后的影像保存到HDFS上路径
* @return plant 植被像元数,非植被像元数
* @throws IOException
*/
public Plant gdalOp(String fileName_tif, String svmPath, String classifySaveHdfsPath) throws IOException{
gdal.AllRegister();
ogr.RegisterAll();//记得添加驱动注册
// 为了支持中文路径,请添加下面这句代码
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8","NO");
// 为了使属性表字段支持中文,请添加下面这句
gdal.SetConfigOption("SHAPE_ENCODING","");
//第一步: 将Tiff读取到数组
System.out.println("1)将Tiff读取到数组");
//String fileName_tif = "E:\\JAVA_neunn\\gdalTest\\source\\GF2_PMS2_132120030030_20150212_0000647892.TIFF";
// gdal.AllRegister();
Dataset pInputDataset = gdal.Open(fileName_tif, gdalconstConstants.GA_ReadOnly);
if (pInputDataset == null)
{
System.err.println("GDALOpen failed - " + gdal.GetLastErrorNo());
System.err.println(gdal.GetLastErrorMsg());
System.exit(1);
}
int xSize=pInputDataset.GetRasterXSize();
int ySize=pInputDataset.GetRasterYSize();
int bandCount=pInputDataset.GetRasterCount();
System.out.println("xSize "+xSize + " "+"ySize "+ ySize+" "+"bandCount "+bandCount);
int [] pInputBuf=new int[xSize*ySize*bandCount];
int [] pBandMap=new int[bandCount];
for(int i=0;i<bandCount;i++){
pBandMap[i]=i+1;
}
double []gt={0,0,0,0,0,0};
pInputDataset.GetGeoTransform(gt);
String strProjection=pInputDataset.GetProjectionRef();
pInputDataset.ReadRaster(0, 0, xSize, ySize, xSize, ySize, gdalconstConstants.GDT_Int32, pInputBuf, pBandMap);
// pInputDataset.delete();
System.out.println(" 读取完成");
// 可选
//gdal.GDALDestroyDriverManager();
//第二步: 加载分类模型
System.out.println("2)读取SVM模型");
svm_model pModel = svm.svm_load_model(svmPath);
svm_node []sn = new svm_node[bandCount+1];
for( int i = 0; i < bandCount; i++)
{ sn[i]=new svm_node();
sn[i].index = i;
}
sn[bandCount]=new svm_node();
sn[bandCount].index = -1; // 结束符,代替波段数
System.out.println(" 读取完成");
//第三步:应用分类模型:
System.out.println("3)植被检测");
//创建分类结果数据集
int nPixelNum = xSize*ySize;
//double []pDst = new double[nPixelNum];
int []pDst = new int[nPixelNum];
//int []pDst = new int[4000000];
//植被检测
int totalPixelNum = 0;
int plantPixelNum = 0;
for( int i = 0; i < nPixelNum; i++ )
//for( int i = 0; i < 4000000; i++ )
{
//计算无效点
int inValidNum = 0;
for( int j = 0; j < bandCount; j++ )
{
sn[j].value = (double)pInputBuf[j*nPixelNum+i];
//if ( sn[j].value < MIN_VAL)
//{
// inValidNum++;
//}
}
if ( inValidNum != bandCount )
{
totalPixelNum++;
//svm_predict( pModel, sn );
pDst[i]=(int)svm.svm_predict(pModel,sn);
//pDst[i] = (BYTE)svm_predict( pModel, sn );
if ( pDst[i] == 1 )
{
plantPixelNum++;
}
}
else
{
pDst[i] = 0;
}
}
//计算植被含量百分比
if( totalPixelNum > 0 )
{
double cloudContent = plantPixelNum*1.0/totalPixelNum;
System.out.println("植被含量百分比 "+cloudContent);
}
Plant plant = new Plant();
plant.setPlant(Integer.toString(plantPixelNum));
plant.setNoPlant(Integer.toString(totalPixelNum - plantPixelNum));
//第四步:保存生成的分类文件
System.out.println("4)保存生成的分类文件 ");
//生成矢量文件;
Driver drivers=pInputDataset.GetDriver();
String strDriverName = "ESRIShapefile";
org.gdal.ogr.Driver oDriver =ogr.GetDriverByName(strDriverName);
String classifySaveLocalPath = "E:\\JAVA_neunn\\gdalTest\\source\\mask.tiff";
Dataset pMaskDataSet =drivers.Create( classifySaveLocalPath, xSize, ySize,1);
pMaskDataSet.SetGeoTransform(gt);
pMaskDataSet.SetProjection(strProjection);
int [] pBandMap2=new int[1];
for(int i=0;i<1;i++){
pBandMap2[i]=i+1;
}
pMaskDataSet.WriteRaster(0, 0, xSize, ySize, xSize, ySize, gdalconstConstants.GDT_UInt32, pDst, pBandMap2);
System.out.println("分类文件 完成 "+classifySaveLocalPath);
//上传到HDFS上
HdfsOperator.putFileToHdfs(classifySaveLocalPath, classifySaveHdfsPath);
return plant;
}
}