Hadoop之MapReduce
2020-02-23 本文已影响0人
TZX_0710
MapReduce是Hadoop的一个分布式计算框架,用于编写批处理应用程序。编写好的程序可以提交到Hadoop集群上用于并行处理大规模的数据集。MapReduce专门用于处理key,value键值对处理。它将作业视为一组key,value,并生成一组key,value作为输出
MapReduce编程模型简述
- input:读取文件
- spliting:将文件进行拆分。得到K1行数和V1对应的文本类容。
- mapping: 并行将每一行按照空格进行拆分,拆分得到List(k2,v2),其中k2代表每一个单词,由于是做词频统计,所以V2的值为1代表出现一次
- shuffling:由于Mapping操作可能是在不同的机器上并行处理的。所以需要通过shuffling将相同key值的数据分发到同一个节点上合并。这样才能统计出最终的结果,此时得到K2为每一个单词。List<V2>为可迭代集合,v2就是Mapping中的V2。
- Reducing:这里的案例是统计单词出来的总结数,所以reducing对List<v2>进行归约求和操作,最终输出。
MapReduce编程模型中splitting和shuffing操作都是由框架实现的,`需要我们自己编程的只有mapping和reducing,这也就是MapReduce这个称呼的来源。
InputFormat & RecordReaders
InputFormat将输出文件拆分为多个InputSplit,并由RecordReaders将InputSplit转换为标准的Key,Value键值对,作为map的输出。这一步的意义在于只有先进行逻辑拆分并转为标准的键值对才能为map提供输入以便并行处理。
Combiner
combiner是map运算后的可选操作,它实际上是一个本地化的reduce操作,它主要是map计算出中间文件后做一个简单的合并重复key值的操作。
map在遇到一个hadoop的单词时就会记录1.但是这边文章里可能会出现N次,那么map输出文件冗余就会很多。因此在reduce计算前相同的key做一个合并操作,那么需要传输的数据量就很减少。
Partitioner
partitioner可以理解成分类器,将map的输出按照key值的不同分别分给对应的reducer,支持自定义实现。
wordCount项目案例
创建maven项目 引入pom文件
//因为采用的pom文件 有的jar冲突 所以我采用了maven helper插件移除了这些冲突的依赖项
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.7</version>
<exclusions>
<exclusion>
<artifactId>jackson-core-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>commons-lang</artifactId>
<groupId>commons-lang</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
</exclusions>
</dependency>
在hadoop中讲解mapreducer有提到过 我们只需要实现map 和reducer
所以下面给出map和reducer的自定义实现
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
String[] words = value.toString().split("\t");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable <IntWritable> values, Context context) throws IOException,
InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write( key, new IntWritable( count ) );
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class WordCount {
private static final String HDFS_URL="hdfs://192.168.80.153:8020";
private static final String HADOOP_USER_NAME="root";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
if (args.length<2){
System.out.println("Input and output paths are necessary!");
return;
}
// 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
Configuration configuration = new Configuration();
// 指明 HDFS 的地址
configuration.set("fs.defaultFS", HDFS_URL);
// 创建一个 Job
Job job = Job.getInstance(configuration);
job.setJarByClass( WordCount.class );
// 设置 Mapper 和 Reducer
job.setMapperClass( WordCountMapper.class);
job.setReducerClass( WordReducer.class);
// 设置 Mapper 输出 key 和 value 的类型
job.setMapOutputKeyClass( Text.class);
job.setMapOutputValueClass( IntWritable.class);
// 设置 Reducer 输出 key 和 value 的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
Path outputPath = new Path(args[1]);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
// 设置作业输入文件和输出文件的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputPath);
// 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
boolean result = job.waitForCompletion(true);
// 关闭之前创建的 fileSystem
fileSystem.close();
// 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
System.exit(result ? 0 : -1);
}
}
编写完成代码 采用maven 的package 功能打包成jar 提交到服务器
使用 命令进行运行
hadoop jar /usr/local//hdfs-0.0.1-SNAPSHOT.jar \
com.spring.hdfs.WordCount \
/wordcount/input.txt /wordcount/output/wordcount --wordcount/input.txt 是读取的文件地址 --workcount/output/workcount 是项目运行完成之后保存在hdfs中的地址
--关于HDFS一些操作指令
1. 创建一个文件夹 hdfs dfs -mkdir /myTask
2. 创建多个文件夹 hdfs dfs -mkdir -p /myTask1/input1
3. 上传文件 hdfs dfs -put /opt/wordcount.txt /myTask/input
4. 查看总目录下的文件和文件夹 hdfs dfs -ls /
5. 查看myTask下的文件和文件夹 hdfs dfs -ls /myTask
6. 查看myTask下的wordcount.txt的内容 hdfs dfs -cat /myTask/wordcount.txt
7. 删除总目录下的myTask2文件夹以及里面的文件和文件夹 hdfs dfs -rmr /myTask2
8. 删除myTask下的wordcount.txt hdfs dfs -rmr /myTask/wordcount.txt
9. 下载hdfs中myTask/input/wordcount.txt到本地opt文件夹中 hdfs dfs -get /myTask/input/wordcount.txt /opt