JAVA+SCALA实现SPARK求TopN

2017-11-27  本文已影响44人  _Kantin

代码如下:使用的spark版本为:spark-core_2.11,基于window平台,使用IDEA+MAVEN


package com.spark.test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args )
    {
        //创建spark的配置对象sparkConf,并设置相关的信息
        SparkConf sparkConf = new SparkConf().setAppName("JavaTopNGroup").setMaster("local");
        //创建spark的入口程序SparkContext
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        //RDD读取本地文件
        JavaRDD<String> lines = ctx.textFile("file:///D://TopN.txt");
        //把每一行数据编程key-value的方式
        JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String line) throws Exception {
                String[] str = line.split(" ");
                return new Tuple2<String, Integer>(str[0],Integer.valueOf(str[1]));
            }
        });
        //按照key进行排序
        JavaPairRDD<String,Iterable<Integer>> groupPairs = pairs.groupByKey();
        final   JavaPairRDD<String,Iterable<Integer>> top5 = groupPairs.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
            public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedData) throws Exception {
                //用于保存TOP5的数据
                Integer[] top5 = new Integer[5];
                //获取分组的组名key
                String groupByKey = groupedData._1();
                //获取分组的内容集合
                Iterator<Integer> groupValue = groupedData._2().iterator();
                while (groupValue.hasNext()){
                    Integer value = groupValue.next();
                    for (int i = 0; i <5 ; i++) {
                        //还不够五个的情况
                        if(top5[i] == null){
                            top5[i] = value;
                            break;
                            //只取值topK的情况
                        }else if (value>top5[i]){
                            for (int j = 4; j > i; j--) {
                                top5[j] = top5[j-1];
                            }
                            top5[i]=value;
                            break;
                        }
                    }
                }
                return new  Tuple2<String, Iterable<Integer>>(groupByKey, Arrays.asList(top5));
            }
        });

        top5.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            public void call(Tuple2<String, Iterable<Integer>> topped) throws Exception {
                System.out.println("Group By Key: "+topped._1());
                Iterator<Integer> toppedValue = topped._2().iterator();
                while(toppedValue.hasNext()){
                    Integer value = toppedValue.next();
                    System.out.println(value);
                }
                System.out.println("*************************************");
            }

        });
        ctx.stop();
    }
}

基于SCALA实现

object TopNGroup{
   val conf=new SparkConf().setAppName("JavaTopNGroup").setMaster("local");
   val sc = new SparkContext(conf);
   val lines = sc.textFile("file:///D://TopN.txt");
   val top5 =lines.map{
     line => val splitedLine = lines.split(" ")
     (splitedLine(0),splitedLine(1).toInt)
   }.groupByKey().map{
     groupedData => val groupByKey = groupedData._1
     val top5:List[Int] =  groupedData._2.toList.sortWith(_>_).take(5)
     Tuple2(groupByKey,top5)
   }
   
   top5.foreach{
     topped => println("Group key: "+topped._1)
     val toopedValue:Iterator[Int] = topped._2.iterator
     while(toopedValue.hasNext){
       val value:Integer = toopedValue.next
       println(value)
     }
      println("*****************")
   }   
   sc.stop()
}

POM.XML文件如下所示:

 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>
上一篇下一篇

猜你喜欢

热点阅读