Spark开发代码

根据温度排序,温度相同按照湿度排序

2018-01-03  本文已影响0人  0_9f3a

原始数据

1949-10-01 14:21:02 34  23
1949-10-01 19:21:02 38  34
1949-10-02 14:01:02 36  56
1950-01-01 11:21:02 32  67
1950-10-01 12:21:02 37  11
1951-12-01 12:21:02 23  78
1950-10-02 12:21:02 41  39
1950-10-03 12:21:02 27  88
。。。。。

思路:
1.将数据读取到RDD1中
2.将RDD1中的数据转换成K-V格式的RDD2
3.对RDD2使用sortByKey排序
代码

public class SecondSort {
    public static void main(String[] args) {
//获取温度 湿度信息
        SparkConf conf = new SparkConf().setAppName("SecondSort").setMaster("local[1]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> weatherRdd = sc.textFile("weather");
        
        /**
         * mapToPair 算子是java api只能够独有的,在scala api中没有这个算子 在scala中相当于map
         * mapToPair可以返回一个KV格式的RDD
         * 泛型解释:
         *  String:wetherRDD中每一条元素的类型, SortObj:返回的RDD的key类型, String:返回的RDD的value类型
         */
        weatherRdd.mapToPair(new PairFunction<String, SortObj, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<SortObj, String> call(String log) throws Exception {
                //log = weatherRdd中每一条记录
                String[] splited = log.split("\t");
                Integer temperature = Integer.parseInt(splited[1].trim());
                Integer shidu = Integer.parseInt(splited[2]);
                SortObj sortObj = new SortObj(temperature,shidu);
                return new Tuple2<SortObj, String>(sortObj,log);
            }
        }).sortByKey()//对RDD2的温度进行排序
        .foreach(new VoidFunction<Tuple2<SortObj,String>>() {//遍历RDD2中每一条数据

            /**
             * SortObj RDD2的key的类型 String RDD2的value类型
             */
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<SortObj, String> t) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(t);
            }
            /**
             * SortObj RDD2 Key的类型
             */
        });
        sc.stop();
    }
}

其中SortObj用来寻找温度相同的元素

public class SortObj implements Serializable,Comparable<SortObj> {
    private Integer temperature;
    private Integer shidu;
    public SortObj() {
        super();
    }
    public SortObj(Integer temperature, Integer shidu) {
        super();
        this.temperature = temperature;
        this.shidu = shidu;
    }
    public Integer getTemperature() {
        return temperature;
    }
    public void setTemperature(Integer temperature) {
        this.temperature = temperature;
    }
    public Integer getShidu() {
        return shidu;
    }
    public void setShidu(Integer shidu) {
        this.shidu = shidu;
    }
    @Override
    public int compareTo(SortObj o) {
        if(o.getTemperature() - getTemperature() == 0){
            return o.getShidu() - getShidu();
        }else{
            return o.getTemperature() - getTemperature();
        }
    }
}

问题:
在scala中如何将一个非KV格式的RDD变成KV格式的RDD?
原则: 只要是xxToPair这样的方法,他的返回值一定是一个KV格式的RDD

上一篇 下一篇

猜你喜欢

热点阅读