实现流量汇总排序
2019-01-02 本文已影响0人
小月半会飞
要求:将手机流量汇总,并且按照总流量的使用从大到小排序
第一步,将手机以及流量使用个情况汇总:
FlowBean代码:
package com.neusoft;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Created by Administrator on 2018/12/29.
*/
// 通过实现WritableComparable<FlowBean>,对对象进行排序
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long totalFlow;
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.totalFlow = upFlow + downFlow;
}
@Override
public String toString() {
return upFlow +
"\t" + downFlow +
"\t" + totalFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getTotalFlow() {
return totalFlow;
}
public void setTotalFlow(long totalFlow) {
this.totalFlow = totalFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(totalFlow);
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
totalFlow = dataInput.readLong();
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
}
// 这一部分代码,是指定对象排序时,按照这一属性排序
//返回负数,表示当前值小于传过来的值,返回正数,表示当前值大于传过来的值,返回0表示两个值相等
@Override
public int compareTo(FlowBean o) {
return (int)(o.totalFlow - this.totalFlow);
}
}
FlowMapper代码:
package com.neusoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Created by Administrator on 2018/12/29.
*/
public class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// String[] words = value.toString().split("\\pP|\\s+");
String[] words = value.toString().split("\\s+");
String phoneNo = words[1];
String upFlow = words[words.length - 3];
String downFlow = words[words.length - 2];
FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));
context.write(new Text(phoneNo), flowBean);
}
}
FlowReducer代码:
package com.neusoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by Administrator on 2018/12/29.
*/
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean>
{
// <13560439658,List(flowbean1,flowbean2)>
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
System.out.println(key);
long sumUpFlow = 0;
long sumDownFlow = 0;
for(FlowBean flowBean : values)
{
sumDownFlow += flowBean.getDownFlow();
sumUpFlow += flowBean.getUpFlow();
}
FlowBean finalFlowBean = new FlowBean(sumDownFlow,sumUpFlow);
context.write(key,finalFlowBean);
}
}
FlowDriver代码:
package com.neusoft;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相当于一个yarn集群的客户端
* 需要在此封装我们的mr程序的相关运行参数,指定jar包
* 最后提交给yarn
*/
public class FlowDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root") ;
System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
if (args == null || args.length == 0) {
return;
}
FileUtil.deleteDir(args[1]);
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//jar
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean bResult = job.waitForCompletion(true);
System.out.println("--------------------------------");
System.exit(bResult ? 0 : 1);
}
}
程序运行结果如下:
13480253104 180 180 360
13502468823 110349 7335 117684
13560436666 954 1116 2070
13560439658 5892 2034 7926
13602846565 2910 1938 4848
13660577991 690 6960 7650
13719199419 0 240 240
13726230503 24681 2481 27162
13726238888 24681 2481 27162
13760778710 120 120 240
13826544101 0 264 264
13922314466 3720 3008 6728
13925057413 48243 11058 59301
13926251106 0 240 240
13926435656 1512 132 1644
15013685858 3538 3659 7197
15920133257 2936 3156 6092
15989002119 180 1938 2118
18211575961 2106 1527 3633
18320173382 2412 9531 11943
84138413 1432 4116 5548
第二部,将汇总好的文件按流量的使用情况进行排序
前面我们已经将bean所需要依据的属性指定好了,现在可以直接使用这个bean
SortFlowMapper:
package com.neusoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Created by Administrator on 2018/12/29.
*/
public class SortFlowMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+");
String phoneNo = words[0];
String upFlow = words[1];
String downFlow = words[2];
FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));
context.write(flowBean,new Text(phoneNo));
}
}
SortFlowReducer:
package com.neusoft;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by Administrator on 2018/12/29.
*/
public class SortFlowReducer extends Reducer<FlowBean,Text,Text,FlowBean>
{
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 这里reducer有自己的排序,会将输出结果按照返回值的正负来排序
context.write(values.iterator().next(),key);
}
}
SortFlowDriver:
package com.neusoft;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相当于一个yarn集群的客户端
* 需要在此封装我们的mr程序的相关运行参数,指定jar包
* 最后提交给yarn
*/
public class SortFlowDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root") ;
System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
if (args == null || args.length == 0) {
return;
}
FileUtil.deleteDir(args[1]);
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//jar
job.setJarByClass(SortFlowDriver.class);
job.setMapperClass(SortFlowMapper.class);
job.setReducerClass(SortFlowReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
/*job.setNumReduceTasks(5);
job.setPartitionerClass(FlowPartitioner.class);*/
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean bResult = job.waitForCompletion(true);
System.out.println("--------------------------------");
System.exit(bResult ? 0 : 1);
}
}
结果:
13502468823 110349 7335 117684
13925057413 48243 11058 59301
13726230503 24681 2481 27162
18320173382 2412 9531 11943
13560439658 5892 2034 7926
13660577991 690 6960 7650
15013685858 3538 3659 7197
13922314466 3720 3008 6728
15920133257 2936 3156 6092
84138413 1432 4116 5548
13602846565 2910 1938 4848
18211575961 2106 1527 3633
15989002119 180 1938 2118
13560436666 954 1116 2070
13926435656 1512 132 1644
13480253104 180 180 360
13826544101 0 264 264
13719199419 0 240 240