Hadoop-3.1.3(九)MapReduce 自定义格式输入
2021-07-23 本文已影响0人
_大叔_
自定义格式输入组件
自定义输入组件会在 MapperTask 之前执行,并作为MapperTask的输入key和value。
定义RecordReader
package com.example.demo.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import java.io.IOException;
import java.io.InputStream;
/**
* @author big uncle
* @date 2021/5/14 16:49
* @module
**/
public class LineNumberInputRecordReader extends RecordReader<IntWritable, Text> {
/**
* 件切片
**/
private FileSplit fs;
/**
* 输入key
**/
private IntWritable key;
/**
* 输入value
**/
private Text value;
/**
* 行读取器
**/
private LineReader reader;
/**
* 记录行号
**/
private int count;
/**
* 初始化方法,用于初始化文件切片和行读取器
**/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
// 初始化文件切片
fs = (FileSplit) inputSplit;
// 通过切片获取切片路径
Path path = fs.getPath();
// 获取job的环境变量参数
Configuration conf = taskAttemptContext.getConfiguration();
// 获取HDFS文件系统对象
FileSystem system = path.getFileSystem(conf);
// 获取切片对应的文件数据的输入流
InputStream in = system.open(path);
// 初始化航都读取器
reader = new LineReader(in);
}
/**
* 此方法会被掉用多次,如果return true会继续被调用,直到return false
* 一行一行读取数据,直到读完为止
**/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
key = new IntWritable();
value = new Text();
Text tmp = new Text();
// 每调一次,会读取一行
int length = reader.readLine(tmp);
if(length==0){
// 文件数据以读完
return false;
}
count++;
// 将行号复赋值给输入key
key.set(count);
// 将每行内容赋值给value
value.set(tmp);
return true;
}
/**
* 此方法用于将输入key传给Mapper组件
* nextKeyValue方法被调用一次,该方法也被调用一次
**/
@Override
public IntWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
/**
* 此方法用于将输入value传给Mapper组件
* nextKeyValue方法被调用一次,该方法也被调用一次
**/
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
reader = null;
}
}
定义FileInputFormat
package com.example.demo.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* @author big uncle
* @date 2021/5/14 16:47
* 自定义格式输入组件,决定Mapper的输入Key和输入Value类型
* 第一个泛型是Mapper的输入Key
* 第二个泛型是Mapper的输入Value
**/
public class LineNumberInputFormat extends FileInputFormat<IntWritable, Text> {
@Override
public RecordReader<IntWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new LineNumberInputRecordReader();
}
}
在job中设置环境
// 设置自定义格式化输入 默认是 TextInputFormat
job.setInputFormatClass(LineNumberInputFormat.class);
自定义格式输出组件
定义RecordWriter
package com.example.demo.mr;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.checkerframework.checker.units.qual.K;
import java.io.IOException;
/**
* @author big uncle
* @date 2021/5/14 17:52
* @module
**/
public class AuthRecodeWriter<K,V> extends RecordWriter<K,V> {
private FSDataOutputStream outputStream;
public AuthRecodeWriter(FSDataOutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public void write(K k, V v) throws IOException, InterruptedException {
// 将输入key挟到文件里,如果只有mapper 则是mapper的输出key,
// 既有 mapper和 reduce,则是reduce的输出key
outputStream.write(k.toString().getBytes());
// 输出k v分隔符,默认是Tab制表符
outputStream.write("|".getBytes());
outputStream.write(v.toString().getBytes());
// 输出行于行的分隔符
outputStream.write("@@@".getBytes());
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
outputStream.close();
}
}
定义FileOutputFormat
package com.example.demo.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author big uncle
* @date 2021/5/14 17:48
* @module
**/
public class AuthOutputFormat<K,V> extends FileOutputFormat<K,V> {
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Path path = super.getDefaultWorkFile(taskAttemptContext,"");
Configuration conf = taskAttemptContext.getConfiguration();
FileSystem system = path.getFileSystem(conf);
// 获取输出流
FSDataOutputStream outputStream = system.create(path);
return new AuthRecodeWriter<K,V>(outputStream);
}
}
读取多个文件,把文件结果输出到同一个结果文件,结果内容会以交替的形式存在。
在job中设置环境
// 设置自定义数组组件 默认是 TextOutFormat,kv分隔符默认tab制表符,行与行默认换行符
job.setOutputFormatClass(AuthOutputFormat.class);
多输入源
package com.example.demo.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author big uncle
* @date 2021/5/11 11:13
* @module
**/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
// 获取job对象1
Job job = Job.getInstance(configuration);
// 设置job方法入口的驱动类
job.setJarByClass(WordCountDriver.class);
// 读取多个文件
MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/aa.txt"), TextInputFormat.class,WordCountMapper1.class);
MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/bb.txt"), TextInputFormat.class,WordCountMapper2.class);
// 设置Mapper输出的key的类型
job.setMapOutputKeyClass(Text.class);
// 输出Mapper输出的value的类型
job.setMapOutputValueClass(Flow.class);
// 输出到指定目录,要求结果路径事先不存在
FileOutputFormat.setOutputPath(job,new Path("hdfs://node113:9000/test1/result"));
// 提交job
job.waitForCompletion(true);
}
}
多输出源
Reduce
package com.example.demo.mr;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
/**
* @author big uncle
* @date 2021/5/15 10:44
* @module
**/
public class MultipleOutputReduce extends Reducer<Text,Text,Text,Text> {
/**
* 多输出源
**/
private MultipleOutputs<Text,Text> outputs;
/**
* 初始化方法
**/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
outputs=new MultipleOutputs(context);
}
/**
* 处理内容
**/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text text:values){
if("tom".equals(key)){
// 输出到 tomFile 文件
outputs.write("tomFile",key,text);
}else if("rose".equals(key)){
outputs.write("roseFile",key,text);
}
}
}
}
job
package com.example.demo.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* @author big uncle
* @date 2021/5/11 11:13
* @module
**/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
// 获取job对象1
Job job = Job.getInstance(configuration);
// 设置job方法入口的驱动类
job.setJarByClass(WordCountDriver.class);
// 读取多个文件
MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/aa.txt"), TextInputFormat.class,WordCountMapper1.class);
MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/bb.txt"), TextInputFormat.class,WordCountMapper2.class);
// 设置Mapper输出的key的类型
job.setMapOutputKeyClass(Text.class);
// 输出Mapper输出的value的类型
job.setMapOutputValueClass(Flow.class);
// 设置Reduce
job.setReducerClass(MultipleOutputReduce.class);
// 多输出源
MultipleOutputs.addNamedOutput(job,"tomFile", TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"roseFile", TextOutputFormat.class,Text.class,Text.class);
// 输出到指定目录,要求结果路径事先不存在
FileOutputFormat.setOutputPath(job,new Path("hdfs://node113:9000/test1/result"));
// 提交job
job.waitForCompletion(true);
}
}