大数据

单词统计

2019-06-22  本文已影响8人  Sophie12138
目录
前言
单词统计
统计手机用户流量日志
即将开始...

有一个文本文件,被分成了4份,分别放到了4台服务器中存储

Text1:the weather is good
Text2:today is good
Text3:good weather is good
Text4:today has good weather

现在要统计出每个单词的出现次数。
1.单词统计的业务过程拆解
1.1.拆分单词

输入:“the weather is good”

输出:(the,1),(weather,1),(is,1),(good,1)

输入:“today is good”

输出:(today,1),(is,1),(good,1)

输入:“good weather is good”

输出:(good,1),(weather,1),(is,1),(good,1)

输入:“today has good weather”

输出:(today,1),(has,1),(good,1),(weather,1)

1.2.排序

1.3.合并

image image image image

1.4.汇总统计

每个map节点都完成以后,就要进入reduce阶段了。

例如使用了3个reduce节点,需要对上面4个map节点的结果进行重新组合,比如按照26个字母分成3段,分配给3个reduce节点。

Reduce节点进行统计,计算出最终结果。

这就是最基本的MapReduce处理流程。

2.编程思路

了解了MapReduce的工作过程,我们思考一下用代码实现时需要做哪些工作?

但不用担心,MapReduce是一个非常优秀的编程模型,已经把绝大多数的工作做完了,我们只需要关心2个部分:

编写好这两个核心业务逻辑之后,只需要几行简单的代码把map和reduce装配成一个job,然后提交给Hadoop集群就可以了。

至于其它的复杂细节,例如如何启动map任务和reduce任务、如何读取文件、如对map结果排序、如何把map结果数据分配给reduce、reduce如何把最终结果保存到文件等等,MapReduce框架都帮我们做好了,而且还支持很多自定义扩展配置,例如如何读文件、如何组织map或者reduce的输出结果等等,以上种种细节也会在后面的章节深入探索。
3.wordcount hadoop实战
进行如下三个步骤:

1.编写java源代码,打包成jar文件
代码结构:



WordCountMapper.java

package hadoop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * word count map程序
 *
 * @author liaochuanhu
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public class WordCountMapper extends Mapper {
    @Override
    protected void map(Object key, Object value, Context context) 
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

WordCountReducer.java

package hadoop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * word count reduce程序
 *
 * @author liaochuanhu
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public class WordCountReducer extends Reducer {
    @Override
    protected void reduce(Object key, Iterable values, Context context) 
            throws IOException, InterruptedException {
        Integer count = 0;
        Iterator<IntWritable> iterator =  values.iterator();
        while (iterator.hasNext()) {
            count += iterator.next().get();
        }
        context.write(key, new IntWritable(count));
    }
}

WordCountMapReduce.java

package hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * word count main方法
 *
 * @author liaochuanhu
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public class WordCountMapReduce {
    public static void main(String[] args)
            throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "wordcount");
        job.setJarByClass(WordCountMapReduce.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);
        if (!result) {
            System.out.println("word count task fail!");
        }
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>hadoop</groupId>
  <artifactId>hadoop</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>hadoop</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>commons-beanutils</groupId>
      <artifactId>commons-beanutils</artifactId>
      <version>1.9.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-common</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>
编译生成的jar包

2.加载四个txt文件到hdfs文件系统中


分别执行命令:
hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put *.txt /wordcount/input

3.使用hadoop运行我们编写的java程序,去解析位于hdfs中的txt文件
执行jar文件:
hadoop jar hadoop-1.0-SNAPSHOT.jar hadoop/WordCountMapReduce /wordcount/input /wordcount/output

执行完成后查看结果:
hdfs dfs -cat /wordcount/output/*

在HDFS web界面也能查看输出文件和输入文件:


4.过程解析
下面看一下从job提交到执行完成这个过程是怎样。

4.1客户端提交任务

Client提交任务时会先到HDFS中查看目标文件的大小,了解要获取的数据的规模,然后形成任务分配的规划,例如:

a.txt 0-128M交给一个task,128-256M 交给一个task,b.txt 0-128M交给一个task,128-256M交给一个task ...,形成规划文件job.split。

然后把规划文件job.split、jar、配置文件xml提交给yarn(Hadoop集群资源管理器,负责为任务分配合适的服务器资源)

image

4.2 启动appmaster

注:appmaster是本次job的主管,负责maptask和reducetask的启动、监控、协调管理工作。

yarn找一个合适的服务器来启动appmaster,并把job.split、jar、xml交给它。

image

4.3 启动maptask

Appmaster启动后,根据固化文件job.split中的分片信息启动maptask,一个分片对应一个maptask。

分配maptask时,会尽量让maptask在目标数据所在的datanode上执行。

image

4.4 执行maptask

Maptask会一行行地读目标文件,交给我们写的map程序,读一行就调一次map方法,map调用context.write把处理结果写出去,保存到本机的一个结果文件,这个文件中的内容是分区且有序的。

分区的作用就是定义哪些key在一组,一个分区对应一个reducer。

image

4.5 启动reducetask

Maptask都运行完成后,appmaster再启动reducetask,maptask的结果中有几个分区就启动几个reducetask。

image

4.6 执行reducetask

reducetask去读取maptask的结果文件中自己对应的那个分区数据,例如reducetask_01去读第一个分区中的数据。

reducetask把读到的数据按key组织好,传给reduce方法进行处理,处理结果写到指定的输出路径。

上一篇 下一篇

猜你喜欢

热点阅读