大数据Hadoop玩转大数据

一个利用mapreduce思想单词计数的实例

2017-03-16  本文已影响0人  DayDayUpppppp

这里写得是,如果利用mapreduce分布式的计算框架来写一个单词计数的demo。比如说,给出一个文件,然后,输出是统计文件里面所有的单词出现的次数。

map 函数
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
 * KEYIN 读到的一行数据的偏移量 lONG
 * KEYOUT 读到的一行数据的内容 STRING
 * keyout 输出一个单词,是一个string的类型
 * valueout int值
 * hadoop 有自己的一套序列化的机制,他的序列化比jdk的序列化更加精简,
 * 可以提高网络的传输效率
 * long --》 longwritable
 * string --》 text
 * integer --》 intwritable
 * null  --> nullwritable
 */
public class worldcount extends Mapper<LongWritable,Text,Text,IntWritable>{
    //重载Mapper类的map方法
    //key 其实对应   keyin
    // value 就是  keyout
    protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
        // 将拿到的这行数据按照空格切分
        String line=value.toString();
        String [] linewords = line.split(" ");
        for(String word:linewords){
            // 所以在context里面写的内容就是 key:word,value 是1
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

reduce 函数
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class worldcountreduce extends  Reducer <Text,IntWritable,Text,IntWritable> {
    // 一组相同的key,调用一次reduce
    //相当于调用一次 ,计算一个key对应的个数
    protected void reduce (Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
        
        //统计单词数
        int count=0;
        for(IntWritable value :values){
            count=count+value.get();
        }
        
        //将输出的结果放到context 里面
        context.write(key,new IntWritable(count));
    }
    
}

jobclient 函数
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
 *job 提交器 是yarn集群的一个客户端,他负责将mr程序需要的信息全部封装成一个配置文件里面
 *然后连同我们mr程序所在的一个jar包,一起提交给yarn,有yarn去启动mr程序中的mrappmaster 
 */
public class jobclient {
    public static void main(String []args) throws IOException, ReflectiveOperationException, InterruptedException{
        Configuration conf=new Configuration();
        //conf.set("yarn.resoucemanager.hostname", value);  
        Job job=Job.getInstance(conf);
        //job.setJar("~/code/WordCount.jar");
        //告知客户端的提交器 mr程序所在的jar包
        //这样就不必使用setjar 这样的方法了
        job.setJarByClass(jobclient.class);
        // 告知mrapp master ,map 和reduce 对应的实现类
        job.setMapperClass(worldcount.class);
        job.setReducerClass(worldcountreduce.class);
        //告知输入,和输出的数据结构的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //告知mrappmaster 我们启动的reduce tash的数量
        //启动maptask 的数量 是yarn 会自动的计算
        job.setNumReduceTasks(3);
        
        //指定一个目录而不是文件
        FileInputFormat.setInputPaths(job, new Path("hdfs://localhost/wordcount/"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://localhost/wordcount/output/"));
        // job.submit()
        //这个要比job.submit 要好,因为这个client并不会在提交任务之后,就退出,而是创建一个线程去监控 map和reduce的运行
        boolean res=job.waitForCompletion(true);
        // 执行成功 状态吗 是0,执行失败 状态码是100
        // 通过echo $? 显示状态码
        System.out.println("wakakka ");
        System.exit(res?0:100);
    }
}

小结:

其实代码的业务逻辑很简单,并不是很复杂。但是刚刚开始学的时候,不会熟悉,如何在hadoop这样的环境下去运行代码,走了很多弯路。所以,我决定,写成一个每一步的截图的教程。

一个简明的Mapreduce 原理分析:
http://www.jianshu.com/p/6b6a42a0740c

Hadoop入门—基本原理简介:
http://www.jianshu.com/p/a8b08350960f


1.创建一个java项目

然后导入需要的jar包(都是关于hadoop的包),在dfs里面是使用了maven,可以很方便的就需要的包导入进来,但是也可以收到的去导入一下。

hadoop所有的jar包都在hadoop/share/目录下面
hdfs是关于文件系统的包,mapreduce是关于mpreduce计算框架的包


image.png

进入common目录下面,还有之目录lib下面的包


image.png

子目录lib下面的包


image.png

反正,如果为了不出上面错误,可以都导入进来。


image.png
  1. step2 然后就可以完成写代码的任务了


    image.png
    image.png
    image.png
image.png

step3 :然后将代码打包成一个jar 包


image.png image.png

执行jar包:(input文件已经上传到 dfs上面了)


image.png
image.png
image.png
image.png
image.png
查看输出的文件夹,查看输出结果,因为指定了reduce task ,所以可以看到结果是这样的,有三个输出文件:

mapreduce 的运行流程:
a. runjar 启动客户端
b. 启动mrappmaster (mr的 主控节点)
c. yarnchild 启动,maptask 节点
d. yarnchild 启动,reduce节点
e. 关闭yarnchild
f. 关闭runjar

执行程序的时候:
maptask的节点是程序自己顶的,无法设置。maptask的个数,取决于文件的个数和文件的大小。reducetask的节点是可以自己设定的。

上一篇下一篇

猜你喜欢

热点阅读