单词统计
目录 |
---|
前言 |
单词统计 |
统计手机用户流量日志 |
即将开始... |
有一个文本文件,被分成了4份,分别放到了4台服务器中存储
Text1:the weather is good
Text2:today is good
Text3:good weather is good
Text4:today has good weather
现在要统计出每个单词的出现次数。
1.单词统计的业务过程拆解
1.1.拆分单词
- map节点1
输入:“the weather is good”
输出:(the,1),(weather,1),(is,1),(good,1)
- map节点2
输入:“today is good”
输出:(today,1),(is,1),(good,1)
- map节点3
输入:“good weather is good”
输出:(good,1),(weather,1),(is,1),(good,1)
- map节点4
输入:“today has good weather”
输出:(today,1),(has,1),(good,1),(weather,1)
1.2.排序
- map节点1
- map节点2
- map节点3
- map节点4
1.3.合并
- map节点1
- map节点2
- map节点3
- map节点4
1.4.汇总统计
每个map节点都完成以后,就要进入reduce阶段了。
例如使用了3个reduce节点,需要对上面4个map节点的结果进行重新组合,比如按照26个字母分成3段,分配给3个reduce节点。
Reduce节点进行统计,计算出最终结果。
这就是最基本的MapReduce处理流程。
2.编程思路
了解了MapReduce的工作过程,我们思考一下用代码实现时需要做哪些工作?
-
在4个服务器中启动4个map任务
-
每个map任务读取目标文件,每读一行就拆分一下单词,并记下来此单词出现了一次
-
目标文件的每一行都处理完成后,需要把单词进行排序
-
在3个服务器上启动reduce任务
-
每个reduce获取一部分map的处理结果
-
reduce任务进行汇总统计,输出最终的结果数据
但不用担心,MapReduce是一个非常优秀的编程模型,已经把绝大多数的工作做完了,我们只需要关心2个部分:
-
map处理逻辑——对传进来的一行数据如何处理?输出什么信息?
-
reduce处理逻辑——对传进来的map处理结果如何处理?输出什么信息?
编写好这两个核心业务逻辑之后,只需要几行简单的代码把map和reduce装配成一个job,然后提交给Hadoop集群就可以了。
至于其它的复杂细节,例如如何启动map任务和reduce任务、如何读取文件、如对map结果排序、如何把map结果数据分配给reduce、reduce如何把最终结果保存到文件等等,MapReduce框架都帮我们做好了,而且还支持很多自定义扩展配置,例如如何读文件、如何组织map或者reduce的输出结果等等,以上种种细节也会在后面的章节深入探索。
3.wordcount hadoop实战
进行如下三个步骤:
- 编写java源代码,打包成jar文件
- 加载四个txt文件到hdfs文件系统中
- 使用hadoop运行我们编写的java程序,去解析位于hdfs中的txt文件
1.编写java源代码,打包成jar文件
代码结构:
WordCountMapper.java
package hadoop;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* word count map程序
*
* @author liaochuanhu
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
public class WordCountMapper extends Mapper {
@Override
protected void map(Object key, Object value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
WordCountReducer.java
package hadoop;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
* word count reduce程序
*
* @author liaochuanhu
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
public class WordCountReducer extends Reducer {
@Override
protected void reduce(Object key, Iterable values, Context context)
throws IOException, InterruptedException {
Integer count = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
count += iterator.next().get();
}
context.write(key, new IntWritable(count));
}
}
WordCountMapReduce.java
package hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* word count main方法
*
* @author liaochuanhu
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
public class WordCountMapReduce {
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wordcount");
job.setJarByClass(WordCountMapReduce.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
if (!result) {
System.out.println("word count task fail!");
}
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hadoop</groupId>
<artifactId>hadoop</artifactId>
<version>1.0-SNAPSHOT</version>
<name>hadoop</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
编译生成的jar包
2.加载四个txt文件到hdfs文件系统中
分别执行命令:
hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put *.txt /wordcount/input
3.使用hadoop运行我们编写的java程序,去解析位于hdfs中的txt文件
执行jar文件:
hadoop jar hadoop-1.0-SNAPSHOT.jar hadoop/WordCountMapReduce /wordcount/input /wordcount/output
执行完成后查看结果:
hdfs dfs -cat /wordcount/output/*
在HDFS web界面也能查看输出文件和输入文件:
4.过程解析
下面看一下从job提交到执行完成这个过程是怎样。
4.1客户端提交任务
Client提交任务时会先到HDFS中查看目标文件的大小,了解要获取的数据的规模,然后形成任务分配的规划,例如:
a.txt 0-128M交给一个task,128-256M 交给一个task,b.txt 0-128M交给一个task,128-256M交给一个task ...,形成规划文件job.split。
然后把规划文件job.split、jar、配置文件xml提交给yarn(Hadoop集群资源管理器,负责为任务分配合适的服务器资源)
image4.2 启动appmaster
注:appmaster是本次job的主管,负责maptask和reducetask的启动、监控、协调管理工作。
yarn找一个合适的服务器来启动appmaster,并把job.split、jar、xml交给它。
image4.3 启动maptask
Appmaster启动后,根据固化文件job.split中的分片信息启动maptask,一个分片对应一个maptask。
分配maptask时,会尽量让maptask在目标数据所在的datanode上执行。
image4.4 执行maptask
Maptask会一行行地读目标文件,交给我们写的map程序,读一行就调一次map方法,map调用context.write把处理结果写出去,保存到本机的一个结果文件,这个文件中的内容是分区且有序的。
分区的作用就是定义哪些key在一组,一个分区对应一个reducer。
image4.5 启动reducetask
Maptask都运行完成后,appmaster再启动reducetask,maptask的结果中有几个分区就启动几个reducetask。
image4.6 执行reducetask
reducetask去读取maptask的结果文件中自己对应的那个分区数据,例如reducetask_01去读第一个分区中的数据。
reducetask把读到的数据按key组织好,传给reduce方法进行处理,处理结果写到指定的输出路径。