Hadoop学习笔记-MapReduce小例子
2018-01-13 本文已影响77人
溯水心生
此例为统计一个文本不同单词出现的次数
一、.启动Zookeeper
[root@hadoop05 bin]# pwd
/usr/local/zookeeper/bin
[root@hadoop05 bin]# ls
README.txt zkCleanup.sh zkCli.cmd zkCli.sh zkEnv.cmd zkEnv.sh zkServer.cmd zkServer.sh zkStart-all.sh zkStop-all.sh zookeeper.out
利用批处理脚本启动Zookeeper
1.脚本内容为:
#!/bin/bash
echo "start zkserver..."
for i in 1 2 3
do
ssh hadoop0$i "source /etc/profile;/usr/local/zookeeper/bin/zkServer.sh start"
done
echo "zkServer started!"
2.批处理脚本关闭内容为:
#!/bin/bash
echo "stop zkserver..."
for i in 1 2 3
do
ssh hadoop0$i "source /etc/profile;/usr/local/zookeeper/bin/zkServer.sh stop"
done
echo "zkServer stoped!"
[root@hadoop05 bin]# ./zkStart-all.sh
start zkserver...
Warning: Permanently added 'hadoop01,192.168.43.20' (ECDSA) to the list of known hosts.
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
Warning: Permanently added 'hadoop02,192.168.43.21' (ECDSA) to the list of known hosts.
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
Warning: Permanently added 'hadoop03,192.168.43.22' (ECDSA) to the list of known hosts.
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
zkServer started!
二、启动Hadoop集群
1.Hadoop 集群
[root@hadoop03 ~]# su hadoop
[root@hadoop03 ~]# cd /usr/local/hadoop/sbin
[hadoop@hadoop01 sbin]$ ./start-all.sh
2.开启一个Hadoop JobHistoryServer [执行节点Node02]
[hadoop@hadoop02 sbin]$ cd /usr/local/hadoop/sbin/
[hadoop@hadoop02 sbin]$ ./mr-jobhistory-daemon.sh start historyserver
starting historyserver, logging to /home/hadoop/apps/hadoop-2.7.4/logs/mapred-hadoop-historyserver-hadoop02.out
三、运行Hadoop简单MapReduce示例
1.新建一个Maven 项目,配置Maven依赖包,代码如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yongliang.hadoop</groupId>
<artifactId>MapReducePro</artifactId>
<version>2.0</version>
<packaging>jar</packaging>
<name>WordCount</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
</project>
2.项目结构图
WordCount项目结构图3.编写WordCountAPP 统计代码
具体代码如下:
package com.yongliang.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* WordCount
* @author Zhangyongliang
*/
public class WordCountApp {
static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行数据,将输入的序列化数据转换成字符串
String line = value.toString();
//将一行数据按照分隔符拆分
String[] words = line.split("\t");
//遍历单词数据,输出单词<k,1>
for(String word:words){
//需要序列化写出
context.write(new Text(word),new IntWritable(1));
}
}
}
static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
//reduce方法是针对输入的一组数据,一个key和它的所有value组成一组(k:v1,v2,v3)
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定义一个计数器
int count = 0;
//遍历一组数据,将key出现次数累加到count
for(IntWritable value : values){
count += value.get();
}
context.write(key,new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String jobName = args[0];
String inputPath = args[1];
String outputPath = args[2];
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置作业名称
job.setJobName(jobName);
//设置主类
job.setJarByClass(WordCountApp.class);
//设置作业中使用的Mapper和Reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//设置Mapper阶段的输出key类型和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reducer阶段的输出key类型和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置job的输入路径和输出路径
FileInputFormat.setInputPaths(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,new Path(outputPath));
System.exit(job.waitForCompletion(true)?0:1);
}
}
4.将项目打包成Jar包
打包结果图在系統用戶目录下新建一个文件夹jobs,上传jar包
[hadoop@hadoop01 jobs]$ pwd
/home/hadoop/jobs
[hadoop@hadoop01 jobs]$ ls
MapReducePro-2.0.jar wc.txt
新建一个文本文档wc.txt,写入内容,用于进行MapReduce统计单词出现的次数
[hadoop@hadoop01 jobs]$ cat wc.txt
hello world
hello hadoop
hello bigdata
hello world
5.查看HDFS文件的输入文件夹和输出文件夹,并删除原有的输入文件夹,进行新建
[hadoop@hadoop01 jobs]$ hadoop fs -ls /
^[[A^HFound 3 items
drwxr-xr-x - hadoop supergroup 0 2017-12-23 20:20 /data
drwxrwx--- - hadoop supergroup 0 2017-12-23 19:00 /tmp
drwxr-xr-x - hadoop supergroup 0 2017-12-23 20:20 /wordcount
[hadoop@hadoop01 jobs]$ hadoop fs -rm -r /wordcount/input
18/01/13 16:21:20 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /wordcount/input
[hadoop@hadoop01 jobs]$ hadoop fs -rm -r /wordcount/output
18/01/13 16:21:30 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /wordcount/output
[hadoop@hadoop01 jobs]$ hadoop fs -ls /wordcount
[hadoop@hadoop01 jobs]$ hadoop fs -mkdir /wordcount/input
6.将要统计的文本上传至HDFS文件系统输入路径
[hadoop@hadoop01 jobs]$ hadoop fs -put wc.txt /wordcount/input
[hadoop@hadoop01 jobs]$ hadoop fs -ls /wordcount/input
Found 1 items
-rw-r--r-- 3 hadoop supergroup 51 2018-01-13 16:22 /wordcount/input/wc.txt
7.运行Hadoop统计示例
需要说明的是:
hadoop 运行Jar 具体参数信息为 hadoop jar包名+统计Java代码包名+类名 命名的MapReduce示例名【自定义】 输入路径 输出路径
[hadoop@hadoop01 jobs]$ pwd
/home/hadoop/jobs
[hadoop@hadoop01 jobs]$ hadoop jar MapReducePro-2.0.jar com.yongliang.hadoop.WordCountApp wordcountapp /wordcount/input /wordcount/output
18/01/13 16:26:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/01/13 16:26:26 INFO input.FileInputFormat: Total input paths to process : 1
18/01/13 16:26:26 INFO mapreduce.JobSubmitter: number of splits:1
18/01/13 16:26:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1515828657444_0001
18/01/13 16:26:27 INFO impl.YarnClientImpl: Submitted application application_1515828657444_0001
18/01/13 16:26:27 INFO mapreduce.Job: The url to track the job: http://hadoop01:8088/proxy/application_1515828657444_0001/
18/01/13 16:26:27 INFO mapreduce.Job: Running job: job_1515828657444_0001
18/01/13 16:26:40 INFO mapreduce.Job: Job job_1515828657444_0001 running in uber mode : false
18/01/13 16:26:40 INFO mapreduce.Job: map 0% reduce 0%
18/01/13 16:26:51 INFO mapreduce.Job: map 100% reduce 0%
18/01/13 16:26:58 INFO mapreduce.Job: map 100% reduce 100%
18/01/13 16:26:59 INFO mapreduce.Job: Job job_1515828657444_0001 completed successfully
18/01/13 16:26:59 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=105
FILE: Number of bytes written=247385
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=148
HDFS: Number of bytes written=35
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=7444
Total time spent by all reduces in occupied slots (ms)=4434
Total time spent by all map tasks (ms)=7444
Total time spent by all reduce tasks (ms)=4434
Total vcore-milliseconds taken by all map tasks=7444
Total vcore-milliseconds taken by all reduce tasks=4434
Total megabyte-milliseconds taken by all map tasks=7622656
Total megabyte-milliseconds taken by all reduce tasks=4540416
Map-Reduce Framework
Map input records=4
Map output records=8
Map output bytes=83
Map output materialized bytes=105
Input split bytes=97
Combine input records=0
Combine output records=0
Reduce input groups=4
Reduce shuffle bytes=105
Reduce input records=8
Reduce output records=4
Spilled Records=16
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=190
CPU time spent (ms)=2730
Physical memory (bytes) snapshot=303472640
Virtual memory (bytes) snapshot=4157550592
Total committed heap usage (bytes)=157814784
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=51
File Output Format Counters
Bytes Written=35
统计成功后,应用面板显示如下:
Hadoop统计简单示例成功截图
8.查看MapReduce统计单词结果
[hadoop@hadoop01 jobs]$ hadoop fs -ls /wordcount/output
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2018-01-13 16:26 /wordcount/output/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 35 2018-01-13 16:26 /wordcount/output/part-r-00000
[hadoop@hadoop01 jobs]$ hadoop fs -cat /wordcount/output/part-r-00000
bigdata 1
hadoop 1
hello 4
world 2
至此,简单的单词统计就完成啦!!!