离线实战-网络日志监控分析(三):ETL的第一步:清洗工作1

2020-06-29  本文已影响0人  做个合格的大厂程序员

模块开发----数据入库(ETL)

ETL(Extract-Transform-Load)用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。其工作的实质就是从各个数据源提取数据,对数据进行转换,并最终加载填充数据到数据仓库维度建模后的表中。只有当这些维度/事实表被填充好,ETL工作才算完成。

数据清洗操作

我们首先拿到的数据应该是服务器的原始日志文件,那么我们第一轮应该做的就是清洗数据。让其别为ODS层的数据,如下图所示:

数据入库

那么第一步目标明确了之后我们就可以开始使用mapReduce开始对原始数据开始进行处理。我们拿到的原始数据可能是长这样:

194.237.142.21 - - [01/Nov/2018:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [01/Nov/2018:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [01/Nov/2018:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [01/Nov/2018:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [01/Nov/2018:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [01/Nov/2018:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [01/Nov/2018:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [01/Nov/2018:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [01/Nov/2018:06:50:08 +0000] "-" 400 0 "-" "-"
183.195.232.138 - - [01/Nov/2018:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
183.195.232.138 - - [01/Nov/2018:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
66.249.66.84 - - [01/Nov/2018:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
221.130.41.168 - - [01/Nov/2018:06:50:37 +0000] "GET /feed/ HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
.......

因为有很多我就不一一列举了,这是一个明显的log日志文件。是无序的没有规律的。我们得让这个log变得有格式化才行。我们可以理清一个思路,是关于如何把原始数据清洗成我们需要对应的三种表,一个是pageView表,这个表示关于展示用户在网站的哪个页面逗留,逗留了多长时间,之类的数据,visit表示展示用户从哪个网站进入,从哪个网站离开等信息。我们大致可以梳理一下流程:

清洗格式化数据

这里我们可以没有reduce的聚合过程,只是将数据格式化成标准流数据。比如时间格式处理。数据是否合法等等。。

数据预处理编程思路实现

点击流模型

这里我们就需要将标准化的数据转化为我们所需要的宽表来进行数据的分区建模。大致思路为:

Xnip2020-06-29_14-09-30

ok,在完成了思路的整理之后我们就可以开始着手代码书写MR程序了。

新建项目进行清洗

第一步应该是将服务器的数据标准化输出

首先我们新建一个maven项目。在maven中添加依赖:这些步骤我们之前在MR教程中都已经介绍过:

<dependencies>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.mrunit/mrunit -->
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>1.1.0</version>
            <!-- 如果使用的是hadoop1,那么这里的classfier就指定为hadoop1即可 -->
            <classifier>hadoop2</classifier>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>1.9.5</version>
            <!--    <scope>test</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin </artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>cn.itcast.hadoop.db.DBToHdfs2</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

其次我们添加一个主类,用于进行程序的运行

import cn.leon.cleanBean.CleanBean;
import cn.leon.cleanMapper.CleanMapper;
import javafx.scene.text.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CleanMain  extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {

        Job job = new Job(super.getConf(),"CleanMission");

        FileInputFormat.addInputPath(job,new Path("file:///C:\\Users\\Administrator\\Desktop\\input"));
        FileOutputFormat.setOutputPath(job,new Path("file:///C:\\Users\\Administrator\\Desktop\\output\\CleanData"));

        job.setMapperClass(CleanMapper.class);


        job.setMapOutputValueClass(CleanBean.class);
        job.setMapOutputKeyClass(Text.class);

        //输出为text类型,不用管key
        job.setOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setNumReduceTasks(0);

        boolean result = job.waitForCompletion(true);

        return result?0:1;
    }

    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration,new CleanMain(),args);
        System.exit(run);

    }
}

再建立Mapper阶段

package cn.leon.cleanMapper;


import cn.leon.cleanBean.CleanBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

public class CleanMapper extends Mapper<LongWritable, Text,Text, NullWritable> {

    Set<String> pages = new HashSet<>();
    Text newKey = new Text();


    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        //过滤一些杂质,比如资源文件之类的
        pages.add("/about");
        pages.add("/black-ip-list/");
        pages.add("/cassandra-clustor/");
        pages.add("/finance-rhive-repurchase/");
        pages.add("/hadoop-family-roadmap/");
        pages.add("/hadoop-hive-intro/");
        pages.add("/hadoop-zookeeper-intro/");
        pages.add("/hadoop-mahout-roadmap/");
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String resource = value.toString();
        CleanBean cleanBean = CleanParser.parser(resource);

        if (cleanBean != null){
            CleanParser.filtStaticResource(cleanBean,pages);
            newKey.set(cleanBean.toString());
            context.write(newKey,NullWritable.get());
        }
    }
}

这里我们用了一些工具类,并且单独抽取出来了。简单如下:

package cn.leon.cleanMapper;

import cn.leon.cleanBean.CleanBean;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;

public class CleanParser {

    public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
    public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);

    public static CleanBean parser(String line){
        CleanBean cleanBean = new CleanBean();

        String[] details = line.split(" ");

        //如果每行内容大于11个继续执行
        if (details.length > 11){
            //取ip地址
            cleanBean.setRemote_addr(details[0]);
            //取用户信息
            cleanBean.setRemote_user(details[1]);

            //取时间
            String time_local = formatDate(details[3].substring(1));
            if (time_local ==null || time_local.equals("")){
                time_local = "-invaild_time-";
            }

            cleanBean.setTime_local(time_local);


            //取request方式
            cleanBean.setRequest(details[6]);
            //取状态
            cleanBean.setStatus(details[8]);

            //取发送的字节数
            cleanBean.setBody_bytes_sent(details[9]);
            //用来记录从那个页面链接访问过来的
            cleanBean.setHttp_referer(details[10]);


            //记录客户浏览器的相关信息
            if (details.length > 12){
                StringBuilder stringBuilder = new StringBuilder();

                for (int i=11;i < details.length;i++){
                    stringBuilder.append(details[I]);
                }
                cleanBean.setHttp_user_agent(stringBuilder.toString());
            }else{
                cleanBean.setHttp_user_agent(details[11]);
            }

            //如果请求状态码大于400值,就认为是请求出错了,请求出错的数据直接认为是无效数据
            if (Integer.parseInt(cleanBean.getStatus()) >= 400) {// 大于400,HTTP错误
                cleanBean.setValid(false);
            }

            //如果获取时间没拿到,那么也是认为是无效的数据
            if("-invalid_time-".equals(cleanBean.getTime_local())){
                cleanBean.setValid(false);
            }
        }else{
            cleanBean =  null;
        }

        return cleanBean;
    }


    public static void filtStaticResource(CleanBean bean, Set<String> pages) {
        if (!pages.contains(bean.getRequest())) {
            bean.setValid(false);
        }
    }

    //格式化时间方法
    public static String formatDate(String time_local) {
        try {
            return df2.format(df1.parse(time_local));
        } catch (ParseException e) {
            return null;
        }
    }
}

CleanBean对象

package cn.leon.pageViewBean;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CleanBean implements Writable {

    private boolean valid = true;// 判断数据是否合法
    private String remote_addr;// 记录客户端的ip地址
    private String remote_user;// 记录客户端用户名称,忽略属性"-"
    private String time_local;// 记录访问时间与时区
    private String request;// 记录请求的url与http协议
    private String status;// 记录请求状态;成功是200
    private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    private String http_referer;// 用来记录从那个页面链接访问过来的
    private String http_user_agent;// 记录客户浏览器的相关信息


    public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
        this.valid = valid;
        this.remote_addr = remote_addr;
        this.remote_user = remote_user;
        this.time_local = time_local;
        this.request = request;
        this.status = status;
        this.body_bytes_sent = body_bytes_sent;
        this.http_referer = http_referer;
        this.http_user_agent = http_user_agent;
    }


    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return time_local;
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.valid);
        sb.append("\001").append(this.getRemote_addr());
        sb.append("\001").append(this.getRemote_user());
        sb.append("\001").append(this.getTime_local()) ;
        sb.append("\001").append(this.getRequest());
        sb.append("\001").append(this.getStatus()) ;
        sb.append("\001").append(this.getBody_bytes_sent());
        sb.append("\001").append(this.getHttp_referer());
        sb.append("\001").append(this.getHttp_user_agent());
        return sb.toString();
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
            this.valid = dataInput.readBoolean();
            this.remote_addr = dataInput.readUTF();
            this.remote_user = dataInput.readUTF();
            this.time_local = dataInput.readUTF();
            this.request = dataInput.readUTF();
            this.status = dataInput.readUTF();
            this.body_bytes_sent = dataInput.readUTF();
            this.http_referer = dataInput.readUTF();
            this.http_user_agent = dataInput.readUTF();
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeBoolean(this.valid);
            dataOutput.writeUTF(null==remote_addr?"":remote_addr);
            dataOutput.writeUTF(null==remote_user?"":remote_user);
            dataOutput.writeUTF(null==time_local?"":time_local);
            dataOutput.writeUTF(null==request?"":request);
            dataOutput.writeUTF(null==status?"":status);
            dataOutput.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
            dataOutput.writeUTF(null==http_referer?"":http_referer);
            dataOutput.writeUTF(null==http_user_agent?"":http_user_agent);
    }
}

PreCleanData之后我们就可以获得一个以\001为分隔符间隔的标准化输出文件了。之后我们就可以开始进行第二步,建立PageView和VisitView的建模了。

上一篇下一篇

猜你喜欢

热点阅读