Hadoop MR ETL离线项目

2019-04-03  本文已影响0人  喵星人ZC

一、需求及步骤解析

1、需求

利用MR对日志进行清洗后交由Hive统计分析

2、步骤解析

1、自己造一份日志,包含(cdn,region,level,time,ip,domain,url、traffic)字段,且time、ip、domain、traffic变化,50M到100M大小
2、编写MR程序对日志进行清洗
3、清洗完后的日志移动到Hive外表的location上
4、刷新Hive分区信息
5、查询每个domain的traffic的总和
6、利用Shell封装整个运行过程

二、利用日志生成器生成日志并上传至HDFS

日志生成器

package com.ruoze.hadoop.utils;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

public class GenerateLogUtils {
    public static void main(String[] args) {
        generateLog();
    }

    private static String generateLog() {
        try {

            //创建文件只创建一次  此处代码不能放到for循环中 不然会很耗费性能
            File file = new File("access.log");
            if (!file.exists()) {
                file.createNewFile();
            }

            for (int i = 0; i < 1000000; i++) {
                Random rd = new Random();
                Date date = randomDate("2019-01-01", "2019-01-31");

                String[] domainStr = new String[]{
                        "v1.go2yd.com",
                        "v2.go2yd.com",
                        "v3.go2yd.com",
                        "v4.go2yd.com",
                        "v5.go2yd.com",
                };
                int domainNum = rd.nextInt(domainStr.length - 1);

                String[] trafficStr = new String[]{
                        "136662",
                        "785966",
                        "987422",
                        "975578",
                        "154851",
                        ""
                };

                int trafficNum = rd.nextInt(trafficStr.length - 1);

                StringBuilder builder = new StringBuilder();
                builder
                        .append("baidu").append("\t")
                        .append("CN").append("\t")
                        .append("2").append("\t")
                        .append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)).append("\t")
                        .append(getRandomIp()).append("\t")
                        .append(domainStr[domainNum]).append("\t")
                        .append("http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4").append("\t")
                        .append(trafficStr[trafficNum]).append("\t");
                FileWriter fileWriter = new FileWriter(file.getName(), true);
                fileWriter.write(builder.toString() + "\n");

                fileWriter.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "";
    }


    /**
     * 随机生成时间
     *
     * @param beginDate
     * @param endDate
     * @return
     */
    private static Date randomDate(String beginDate, String endDate) {
        try {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            Date start = format.parse(beginDate);
            Date end = format.parse(endDate);

            if (start.getTime() >= end.getTime()) {
                return null;
            }
            long date = random(start.getTime(), end.getTime());
            return new Date(date);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private static long random(long begin, long end) {
        long rtn = begin + (long) (Math.random() * (end - begin));
        if (rtn == begin || rtn == end) {
            return random(begin, end);
        }
        return rtn;
    }


    /**
     * 随机生成IP-----------------------------------------------------
     *
     * @return
     */
    public static String getRandomIp() {

        // ip范围
        int[][] range = {{607649792, 608174079},// 36.56.0.0-36.63.255.255
                {1038614528, 1039007743},// 61.232.0.0-61.237.255.255
                {1783627776, 1784676351},// 106.80.0.0-106.95.255.255
                {2035023872, 2035154943},// 121.76.0.0-121.77.255.255
                {2078801920, 2079064063},// 123.232.0.0-123.235.255.255
                {-1950089216, -1948778497},// 139.196.0.0-139.215.255.255
                {-1425539072, -1425014785},// 171.8.0.0-171.15.255.255
                {-1236271104, -1235419137},// 182.80.0.0-182.92.255.255
                {-770113536, -768606209},// 210.25.0.0-210.47.255.255
                {-569376768, -564133889}, // 222.16.0.0-222.95.255.255
        };

        Random rdint = new Random();
        int index = rdint.nextInt(10);
        String ip = num2ip(range[index][0] + new Random().nextInt(range[index][1] - range[index][0]));
        return ip;
    }

    /*
     * 将十进制转换成ip地址
     */
    public static String num2ip(int ip) {
        int[] b = new int[4];
        String x = "";

        b[0] = (int) ((ip >> 24) & 0xff);
        b[1] = (int) ((ip >> 16) & 0xff);
        b[2] = (int) ((ip >> 8) & 0xff);
        b[3] = (int) (ip & 0xff);
        x = Integer.toString(b[0]) + "." + Integer.toString(b[1]) + "." + Integer.toString(b[2]) + "." + Integer.toString(b[3]);

        return x;
    }


}

将access.log上传至HDFS路径

 hadoop fs -put access.log  /g6/hadoop/accesslog/20190402/

三、MR清洗

1、编写清洗日志的LogUtils类

package com.ruoze.hadoop.utils;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;

public class LogUtils {
    DateFormat sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.ENGLISH);
    DateFormat targetFormat = new SimpleDateFormat("yyyyMMddHHmmss");

    /**
     * 日志文件解析,对内容进行字段的处理
     * 按\t分割
     */
    public String parse(String log) {
        String result = "";
        try {
            String[] splits = log.split("\t");
            String cdn = splits[0];
            String region = splits[1];
            String level = splits[2];
            String timeStr = splits[3];
//            String time = timeStr.substring(1, timeStr.length() - 7);

            String time = targetFormat.format(sourceFormat.parse(timeStr));

            String ip = splits[4];
            String domain = splits[5];
            String url = splits[6];
            String traffic = splits[7];

            StringBuilder builder = new StringBuilder("");
            builder.append(cdn).append("\t")
                    .append(region).append("\t")
                    .append(level).append("\t")
                    .append(time).append("\t")
                    .append(ip).append("\t")
                    .append(domain).append("\t")
                    .append(url).append("\t")
                    .append(traffic);

            result = builder.toString();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return result;
    }
}

2、LogUtils的单元测试

package com.ruoze.hadoop.utils;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class LogUtilsTest {
    private LogUtils utils;

    @Test
    public void LogUtilsTest() {

        String log = "baidu\tCN\t2\t2019-01-10 16:02:54\t121.77.143.199\tv2.go2yd.com\thttp://v3.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4\t97557845";
        String result = utils.parse(log);
        System.out.println(result);
    }

    @Before
    public void setUp() {

        utils = new LogUtils();
    }

    @After
    public void trarDown() {
        utils = null;
    }
}

测试结果如图:


3、Mapper

package com.ruoze.hadoop.mapreduce;

import com.ruoze.hadoop.utils.LogUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogETLMapper  extends Mapper<LongWritable,Text,NullWritable,Text>{
    /**
     * 通过mapreduce框架的map方式进行数据清洗
     * 进来一条数据就按照我们的解析规则清洗完以后输出
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        int length = value.toString().split("\t").length;
        String traffic = value.toString().split("\t")[7];
        if(length == 8 && traffic != null) {

            LogUtils utils = new LogUtils();
            String result = utils.parse(value.toString());
            if(StringUtils.isNotBlank(result)) {
                context.write(NullWritable.get(), new Text(result));
            }
        }
    }
}

4、Job

package com.ruoze.hadoop.mapreduce;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogETLDriver {
    public static void main(String[] args) throws Exception{
        if (args.length != 2) {
            System.err.println("please input 2 params: input output");
            System.exit(0);
        }

        String input = args[0];
        String output = args[1];  

        //System.setProperty("hadoop.home.dir", "D:\\Hadoop\\hadoop-2.6.0-cdh5.7.0");

        Configuration configuration = new Configuration();

        FileSystem fileSystem = FileSystem.get(configuration);
        Path outputPath = new Path(output);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        Job job = Job.getInstance(configuration);
        job.setJarByClass(LogETLDriver.class);
        job.setMapperClass(LogETLMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

}

以上程序编写后打包上传至服务器:

[hadoop@hadoop000 lib]$ ll
total 12
-rw-r--r-- 1 hadoop hadoop 8754 Mar 29 22:38 hadoop-1.0.jar

在HDFS上创建MR程序的输出路径:

hadoop fs -mkdir -p /g6/hadoop/access/output/day=20190402

四、创建Hive外表

create external table g6_access (
cdn string,
region string,
level string,
time string,
ip string,
domain string,
url string,
traffic bigint
) partitioned by (day string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/g6/hadoop/access/clear' 

因为MR程序每次运行时会删除输出路径,所以Hive的location不要指向输出路径,等MR跑完后将数据移动到location下。

五、运行Hadoop MR程序进行测试

1、运行MR

hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/20190402/ /g6/hadoop/access/output/day=20190402

2、将输出结果移动到Location下

hadoop fs -mv /g6/hadoop/access/output/day=20190402 /g6/hadoop/access/clear

3、刷新Hive分区(不刷新Hive是查询不到数据的)

alter table g6_access add if not exists partition(day=20190402);

4、Hive统计分析每个domain的traffic的总和

hive (g6_hadoop)> select domain,count(*) from g6_access group by domain;
Query ID = hadoop_20190402232525_4b5c6115-d9a4-4dbd-8cbd-768f298decb4
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1554215624276_0002, Tracking URL = http://hadoop000:8088/proxy/application_1554215624276_0002/
Kill Command = /home/hadoop/soul/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop job  -kill job_1554215624276_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-04-02 23:30:45,007 Stage-1 map = 0%,  reduce = 0%
2019-04-02 23:30:51,476 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.73 sec
2019-04-02 23:30:57,940 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.77 sec
MapReduce Total cumulative CPU time: 2 seconds 770 msec
Ended Job = job_1554215624276_0002
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.77 sec   HDFS Read: 44772154 HDFS Write: 76 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 770 msec
OK
domain  _c1
v1.go2yd.com    74908
v2.go2yd.com    74795
v3.go2yd.com    75075
v4.go2yd.com    75222
Time taken: 21.612 seconds, Fetched: 4 row(s)

六、shell封装整个流程

g6_mr_etl.sh

#/bin/bash

source ~/.bash_profile

if [ $# != 1 ] ; then
echo "Usage: g6_mr_etl.sh <dateString>"
echo "E.g.: g6_mr_etl.sh 20190402"
exit 1;
fi


process_date=$1 

echo -e "\033[36m###### step1:MR ETL ######\033[0m"  
hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/$process_date/ /g6/hadoop/access/output/day=$pro
cess_date



hive -e "use hive;
create external table if  not exists g6_access (
cdn string,
region string,
level string,
time string,
ip string,
domain string,
url string,
traffic bigint
) partitioned by (day string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/g6/hadoop/access/clear' ;"

echo -e "\033[36m###### step2:Mv Data to DW ###### \033[0m"  
hadoop fs -mv /g6/hadoop/access/output/day=$process_date /g6/hadoop/access/clear


echo -e "\033[36m###### step3:Alter metadata ######\033[0m"  
database=g6_hadoop
hive -e "use ${database}; alter table g6_access add if not exists partition(day=$process_date);"
上一篇 下一篇

猜你喜欢

热点阅读