Hadoop——MRUnit实战

2018-06-11  本文已影响24人  长青之木

MRUnit简介

官网地址:https://mrunit.apache.org/

Apache MRUnit ™ is a Java library that helps developers unit test Apache Hadoop map reduce jobs.

MRUnit是一个帮助开发者测试map reduce 作业的单元测试库。

代码示例

以maven项目为例,演示如何使用MRUnit进行MR单元测试。

关于示例的讲解,请参考:https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial

项目pom.xml文件,重点关注mrunit,mockito-all, junit三个类库的引入,MRUnit是利用mockito+junit针对MR程序进行模拟测试。

项目的pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>my.hadoopstudy</groupId>
    <artifactId>hadoopstudy</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>hadoopstudy</name>
    <url>http://maven.apache.org</url>

    <properties>
        <hadoop.version>2.0.0-mr1-cdh4.4.0</hadoop.version>
        <hbase.version>0.94.6-cdh4.4.0</hbase.version>
        <project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
        <maven.compiler.encoding>utf-8</maven.compiler.encoding>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <encoding>utf-8</encoding>
                        <source>1.6</source>
                        <target>1.6</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.9</version>
                <configuration>
                    <buildOutputDirectory>eclipse-classes</buildOutputDirectory>
                    <downloadSources>true</downloadSources>
                    <downloadJavadocs>false</downloadJavadocs>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>mockito-all</artifactId>
                    <groupId>org.mockito</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.0.0-cdh4.4.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>
                        jersey-test-framework-grizzly2
                    </artifactId>
                    <groupId>
                        com.sun.jersey.jersey-test-framework
                    </groupId>
                </exclusion>
                <exclusion>
                    <artifactId>netty</artifactId>
                    <groupId>org.jboss.netty</groupId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>${hbase.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- <dependency> <groupId>com.hadoop.gplcompression</groupId> <artifactId>hadoop-lzo-cdh4</artifactId> 
            <version>0.4.15-gplextras</version> </dependency> -->

        <dependency>
            <groupId>org.hsqldb</groupId>
            <artifactId>hsqldb</artifactId>
            <version>2.2.9</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.5.1</version>
        </dependency>

        <!-- junit test -->
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>1.1.0</version>
            <classifier>hadoop2</classifier>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>1.9.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

</project>  
Mapper类

package mrunit;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SMSCDRMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text status = new Text();
    private final static IntWritable addOne = new IntWritable(1);

    static enum CDRCounter {
        NonSMSCDR;
    };

    /**
     * Returns the SMS status code and its count
     */
    protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {

        // 655209;1;796764372490213;804422938115889;6 is the Sample record format
        String[] line = value.toString().split(";");
        // If record is of SMS CDR
        if (Integer.parseInt(line[1]) == 1) {
            status.set(line[4]);
            context.write(status, addOne);
        } else {
            // CDR record is not of type SMS so increment the counter
            context.getCounter(CDRCounter.NonSMSCDR).increment(1);
        }
    }
}
Reducer类
package mrunit;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SMSCDRReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws java.io.IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
MR单元测试类
package mrunit;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import mrunit.SMSCDRMapper.CDRCounter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

/**
 * 测试数据说明 CDRID;CDRType;Phone1;Phone2;SMS Status Code
 * 655209;1;796764372490213;804422938115889;6
 * 353415;0;356857119806206;287572231184798;4
 * 835699;1;252280313968413;889717902341635;0
 * 
 */
public class SMSCDRMapperReducerTest {
    Configuration conf = new Configuration();
    MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
    ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
    MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

    @Before
    public void setUp() {

        // 测试mapreduce
        SMSCDRMapper mapper = new SMSCDRMapper();
        SMSCDRReducer reducer = new SMSCDRReducer();
        mapDriver = MapDriver.newMapDriver(mapper);
        reduceDriver = ReduceDriver.newReduceDriver(reducer);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);

        // 测试配置参数
        mapDriver.setConfiguration(conf);
        conf.set("myParameter1", "20");
        conf.set("myParameter2", "23");

    }

    @Test
    public void testMapper() throws IOException {
        mapDriver.withInput(new LongWritable(), new Text("655209;1;796764372490213;804422938115889;6"));
        mapDriver.withOutput(new Text("6"), new IntWritable(1));
        mapDriver.runTest();
    }

    @Test
    public void testReducer() throws IOException {
        List<IntWritable> values = new ArrayList<IntWritable>();
        values.add(new IntWritable(1));
        values.add(new IntWritable(1));
        reduceDriver.withInput(new Text("6"), values);
        reduceDriver.withOutput(new Text("6"), new IntWritable(2));
        reduceDriver.runTest();
    }

    @Test
    public void testMapperReducer() throws IOException {
        mapReduceDriver.withInput(new LongWritable(), new Text("655209;1;796764372490213;804422938115889;6"));
        mapReduceDriver.withOutput(new Text("6"), new IntWritable(1));
        mapReduceDriver.runTest();
    }

    @Test
    public void testMapperCount() throws IOException {
        mapDriver.withInput(new LongWritable(), new Text("655209;0;796764372490213;804422938115889;6"));
        // mapDriver.withOutput(new Text("6"), new IntWritable(1));
        mapDriver.runTest();
        assertEquals("Expected 1 counter increment", 1,
                mapDriver.getCounters().findCounter(CDRCounter.NonSMSCDR).getValue());
    }
}

参考链接

https://my.oschina.net/cloudcoder/blog/285151

上一篇 下一篇

猜你喜欢

热点阅读