用Spark实现多种方式的排序
方式一:
package Day05
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Sortcust01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val sc=new SparkContext(conf)
//排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序
val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")
//将Driver端的数据并行化变成RDD
val lines = sc.parallelize(users)
val lmap: RDD[User] = lines.map(t => {
val sp = t.split(" ")
val name = sp(0)
val age = sp(1).toInt
val yan = sp(2).toInt
new User(name,age,yan)
})
//将RDD里面装的User类型的数据进行排序
val lsort: RDD[User] =lmap.sortBy (u => u)
val res = lsort.collect()
println(res.toBuffer)
sc.stop()
}
}
class User(val name:String,val age:Int,val nian:Int)extends Ordered[User]with Serializable {
override def compare(that: User): Int = {
if(this.nian==that.nian){
this.age-that.age
}else{
-(this.nian-that.nian)
}
}
override def toString:String =s"name: $name,age:$age,nian:$nian"
}
方式二
package Day05
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Sortcust02 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val sc=new SparkContext(conf)
//排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序
val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")
//将Driver端的数据并行化变成RDD
val lines = sc.parallelize(users)
val lmap: RDD[(String,Int,Int)] = lines.map(t => {
val sp = t.split(" ")
val name = sp(0)
val age = sp(1).toInt
val yan = sp(2).toInt
(name,age,yan)
})
//排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)
val lsort =lmap.sortBy (tp =>new Boy(tp._2,tp._3))
println(lsort.collect().toBuffer)
sc.stop()
}
}
class Boy(val age:Int,val nian:Int)extends Ordered[Boy]with Serializable {
override def compare(that: Boy): Int = {
if(this.nian==that.nian){
this.age-that.age
}else{
-(this.nian-that.nian)
}
}
}
方式三:
package Day05
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Sortcust03{
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val sc=new SparkContext(conf)
//排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序
val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")
//将Driver端的数据并行化变成RDD
val lines = sc.parallelize(users)
val lmap: RDD[(String,Int,Int)] = lines.map(t => {
val sp = t.split(" ")
val name = sp(0)
val age = sp(1).toInt
val yan = sp(2).toInt
(name,age,yan)
})
//排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)
val lsort =lmap.sortBy (tp =>new Man(tp._2,tp._3))
println(lsort.collect().toBuffer)
sc.stop()
}
}
case class Man( age:Int, nian:Int)extends Ordered[Man] {
override def compare(that: Man): Int = {
if(this.nian==that.nian){
this.age-that.age
}else{
-(this.nian-that.nian)
}
}
}
方式四:
package Day05
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Sortcust04{
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val sc=new SparkContext(conf)
//排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序
val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")
//将Driver端的数据并行化变成RDD
val lines = sc.parallelize(users)
val lmap: RDD[(String,Int,Int)] = lines.map(t => {
val sp = t.split(" ")
val name = sp(0)
val age = sp(1).toInt
val yan = sp(2).toInt
(name,age,yan)
})
import SortRules.Orderingxianrou
//排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)
val lsort =lmap.sortBy (tp =>new Xianr(tp._2,tp._3))
println(lsort.collect().toBuffer)
sc.stop()
}
}
case class Xianr( age:Int, nian:Int)
package Day05
object SortRules {
implicit object Orderingxianrouextends Ordering[Xianr] {
override def compare(x: Xianr, y: Xianr): Int = {
if(x.nian==y.nian){
x.age-y.age
}else{
y.nian-x.nian
}
}
}
}
方式五:
package Day05
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Sortcust05{
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val sc=new SparkContext(conf)
//排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序
val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")
//将Driver端的数据并行化变成RDD
val lines = sc.parallelize(users)
val lmap: RDD[(String,Int,Int)] = lines.map(t => {
val sp = t.split(" ")
val name = sp(0)
val age = sp(1).toInt
val yan = sp(2).toInt
(name,age,yan)
})
//充分利用元祖的规则,先比第一个,相等比第二个
val lsort =lmap.sortBy (tp =>(-tp._3,tp._2))
println(lsort.collect().toBuffer)
sc.stop()
}
}
方式六:
package Day05
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Sortcust06 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val sc=new SparkContext(conf)
//排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序
val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")
//将Driver端的数据并行化变成RDD
val lines = sc.parallelize(users)
val lmap: RDD[(String,Int,Int)] = lines.map(t => {
val sp = t.split(" ")
val name = sp(0)
val age = sp(1).toInt
val yan = sp(2).toInt
(name,age,yan)
})
// Ordering[(Int,Int)]最终比较的规则样式
// on[(String,Int,Int)]未比较前的规则样式
// (t=>(-t._3,t._2))怎样将规则转换成想要的格式
implicit val Rules=Ordering[(Int,Int)].on[(String,Int,Int)](t=>(-t._3,t._2))
val lsort =lmap.sortBy (tp =>tp)
println(lsort.collect().toBuffer)
sc.stop()
}
}