hadoop基础Hadoop玩转大数据

使用mapreduce来分析网站的log日志

2017-03-24  本文已影响463人  DayDayUpppppp

所有网站的服务器上都会保留访问的log日志。这些log日志记录的其他机器访问服务器的ip,时间,http协议,状态码等信息。

比如这样:


image.png

大型网站的服务器往往会产生海量的log日志,用hadoop来分析log日志,也是一个很好的练手的机会。

下面写一个例子,通过分析服务器的log日志,统计访问服务器的ip地址和访问的次数。

map函数
public class worldcount extends Mapper<LongWritable,Text,Text,IntWritable>{
    //重载Mapper类的map方法
    // 这里的key是读取文件的行号,value是对应行号的文本
    protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
        //将这一行转化为string
        String line=value.toString();
        //以空格切分
        String [] linewords = line.split(" ");
        //获得ip
        String ip=linewords[0];
        
        // 所以在context里面写的内容就是 key:ip ,value 是1
        context.write(new Text(ip), new IntWritable(1));
    }
}

<br />

reduce 函数
public class worldcountreduce extends  Reducer <Text,IntWritable,Text,IntWritable> {
    // 一组相同的key,调用一次reduce
    //相当于调用一次 ,计算一个key对应的个数
    protected void reduce (Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
        //统计单词数
        int count=0;
        for(IntWritable value :values){
            count=count+value.get();
        }   
        //将输出的结果放到context 里面
        context.write(key,new IntWritable(count));
    }
}

<br />

Main 函数
public class jobclient {
    public static void main(String []args) throws IOException, ReflectiveOperationException, InterruptedException{
        Configuration conf=new Configuration();
        //conf.set("yarn.resoucemanager.hostname", value);  
        Job job=Job.getInstance(conf);
        //job.setJar("~/code/WordCount.jar");
        //告知客户端的提交器 mr程序所在的jar包
        //这样就不必使用setjar 这样的方法了
        job.setJarByClass(jobclient.class);
        // 告知mrapp master ,map 和reduce 对应的实现类
        job.setMapperClass(worldcount.class);
        job.setReducerClass(worldcountreduce.class);
        //告知输入,和输出的数据结构的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //告知mrappmaster 我们启动的reduce tash的数量
        //启动maptask 的数量 是yarn 会自动的计算
        job.setNumReduceTasks(3);
        
        //指定一个目录而不是文件
        FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/kpi/"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9000/kpi/output/"));
        // job.submit()
        //这个要比job.submit 要好,因为这个client并不会在提交任务之后,就退出,而是创建一个线程去监控 map和reduce的运行
        boolean res=job.waitForCompletion(true);
        // 执行成功 状态吗 是0,执行失败 状态码是100
        // 通过echo $? 显示状态码
        System.out.println("wakakka ");
        System.exit(res?0:100);
    }
}

这里的输入是一个目录,可以把输入的文件放到这个目录里面就好。比如这里,我把access.log.10文件放在kpi目录下面。


2017-03-24 22-56-13屏幕截图.png

然后将代码打包为一个jar包,使用hadoo命令执行这个jar包。(执行这条命令必须在jar包所在的目录下面执行)


2017-03-24 22-57-33屏幕截图.png

程序运行的结果是在输出的目录里面:

2017-03-25 09-38-29屏幕截图.png

查看一个文件,就是程序的运行结果:

2017-03-25 09-41-36屏幕截图.png

关于代码的一些小结:

  1. hadoop经常启动失败,或者出现访问失败的情况。
  2. 在写代码之前,一定要把所有的关于hadoop,mapreduce的包导入。
  3. 将代码打包为一个jar包。

github地址:
https://github.com/zhaozhengcoder/hadoop/tree/master/mapreduce_kpi

上一篇下一篇

猜你喜欢

热点阅读