2.mapreduce实例

2020-12-11  本文已影响0人  matthewfly

1.介绍

存储在hdfs中的文件,分布在集群各节点上,将数据汇总处理显然是不现实的。通过mapreduce,可以将计算移至各节点进行同步计算,然后在汇总结果。通俗讲,就是讲用于处理数据的jar包或其他语言可以执行文件分发至hdfs各DataNode,各DataNode运行jar包,最后汇总数据。

2.过程

mapreduce分为map、shuffle和reduce三个过程。其中map阶段对数据进行提取,分离出关心的数据并进行排序,shuffle阶段按照给定的key分发进行reduce处理,其中key值相同的将分发到相同节点。
比如,要统计url的调用量情况。map中将url从数据集中分离出来,并统计数量,用HashMap存的话,key是url,value就是出现的次数,然后以url为key输出。reduce阶段,相同的url数据都会汇总到同一节点,对接收到的值进行累加就行。流程如下:


mapreduce.png

3.实例

新建java工程读取已有hdfs文件,依赖如下:

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
            <optional>true</optional>
        </dependency>
    </dependencies>

定义输出类型,只需要统计次数:

@Data
public class UrlWritable implements Writable {
    private int count;

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(count);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.count = dataInput.readInt();
    }

    @Override
    public String toString() {
        return count;
    }
}

定义mapper,从数据中获取url,并置次数为1,输出key为url。

public class UrlMapper extends Mapper<LongWritable, Text, Text, UrlWritable> {
    private final Text keyText = new Text();
    private final UrlWritable urlWritable = new UrlWritable();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("\t");
        String url = fields[2];
        this.urlWritable.setCount(1);
        keyText.set(url);
        context.write(keyText, this.urlWritable);
    }
}

reducer汇总次数

public class UrlReducer extends Reducer<Text, UrlWritable, Text, UrlWritable> {
    private final UrlWritable urlWritable = new UrlWritable();

    public void reduce(Text key, Iterable<UrlWritable> values, Context context)
        throws IOException, InterruptedException {
        int count = 0;
        for (UrlWritable val : values) {
            count += val.getCount();
        }
        urlWritable.setCount(count);
        context.write(key, urlWritable);
    }
}

main方法:

public class MainClass {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        // 任务名称
        Job job = Job.getInstance(conf, "test_job");
        job.setJarByClass(MainClass.class);
        // mapper
        job.setMapperClass(UrlMapper.class);
        // mapper结束后提前对结果进行汇总(这里都是去和,与reducer一致)
        job.setCombinerClass(UrlReducer.class);
        // reducer
        job.setReducerClass(UrlReducer.class);
        // 输出文件类型,文本文件
        job.setOutputKeyClass(Text.class);
        // 输出对象
        job.setOutputValueClass(UrlWritable.class);
        // 输入文件类型,根据hdfs文件类型确定
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // hdfs中输出路径
        FileOutputFormat.setOutputPath(job, new Path("/flume/test/output"));
        // hdfs中输出入路径,可以有多个
        FileInputFormat.addInputPath(job, new Path("/flume/test/logs"));
        jobs.add(job);
        job.waitForCompletion(true)
    }
}

打包工程生产jar包,test.jar。在hadoop上提交jar包:

/data0/apps/hadoop-3.2.1/bin/hadoop jar test.jar test.com.MainClass

注:若工程中依赖了本地jar包,比如需要发送邮件的mail.jar,那么需要使用fatjar。配置里面添加:

<build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <includeSystemScope>true</includeSystemScope>
                </configuration>
            </plugin>
        </plugins>
    </build>

生产的target目录下,使用origin jar包。

上一篇 下一篇

猜你喜欢

热点阅读