大数据之二次排序

2018-11-08  本文已影响0人  机器不能学习

二次排序指的是第一次段(key键)排序完后,对第一字段相同的条件下再进行对value的排序,且不影响原来key的顺序。

对于归约器排序至少有两种方案。两种都在hadoop/rm和spark中都可以解决。

这里介绍spark框架的排序。

spark的SortBy和SortByKey具有内部调用的是被排序函数的compare方法。如果我们自定义一个数据结构并继承Ordered(提供compare方法)和Serializable(这个很重要,所有被引用到的类都必须是可序列化的,不然会报错的)。
二次排序不过是定义了一个比较方法,在第一个值排序的条件下,进行二个值的排序。以此类推,那么三次排序,N次排序都很容易进行。

class SecondarySort(val two :Int,val one : Int) extends Ordered[SecondarySort] with Serializable {
  override def compare(that: SecondarySort): Int = {
   
    if(that.two-this.two != 0){

      this.two-that.two
    }else{
      that.one-this.one
    }
  }
}

这里最关键是是顺序问题,因为scala的返回值是最后一个输出值。如果没有else,那么输出值永远是最后一个。
有了这个compare函数,我们不仅可以比较int值,可以自定义所有类型的比较方法。

import org.apache.spark.{SparkConf, SparkContext}

object TwoSort {
  def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setAppName("sort2")
    val sc=new SparkContext(conf)

    val source=sc.textFile("/user/join1.txt",1).distinct()

    val rdd= source.map(x=>{
      val xy=x.split(" ")

      (new SecondarySort((xy(0)).toInt,xy(1).size.toInt),x)
    })
    val  rdd2=rdd.sortByKey()
    rdd2.foreach(x=>println("-----------------"+x._2))

  }
}

二次排序多个分区运行。最终多个文件合并后才是完整的排序。
如果在console中打印,不保证有序。
如果我们想要将它最终以一个文件打印出来,可以用repartitionAndSortWithinPartitions。该函数是repartition和sortby的结合,并做了一些优化,因为它可以将排序过程推送到 shuffle 操作的机器上进行。

上一篇下一篇

猜你喜欢

热点阅读