spark

spark分区案例

2018-12-11  本文已影响9人  Frank_8942
import org.apache.spark.{Partitioner, SparkConf}
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer


class Spark06(number:Int) extends Partitioner{
  //number 字段来确定分区数
  override def numPartitions: Int = number

  //分区规则
  override def getPartition(key: Any): Int = {
    try {
      val str = key.toString.substring(0, 1)
      if (str.matches("[A-Z]")) {
        0
      } else if (str.matches("[a-z]")) {
        1
      } else if (str.matches("[1-9]")) {
        2
      } else {
        3
      }
    } catch {
      case _ => 3
    }
  }
}

object Spark06 {
  //自定义分区
  //分区是什么? 分区就是在shuffle阶段, 用来确定每个key要发送到哪个reduce中
  //实现?  继承org.apache.spark.Partitioner 类, 并且重写其中的方法

  //案例:    按照key的首字母进行分区,划分规则:  数字,小写字母,大写字母, 其他
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("test")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    spark.sparkContext.setLogLevel("WARN")

    //计算逻辑
    compute(spark)

    spark.stop()
  }

  def compute(spark:SparkSession):Unit ={
    import spark.implicits._

    val date =
      List(
        "ab Ab 3a R ## $a  ", " #s s 3g 5g Fa ob Bb ¥la a", "a c 4 88 90 _a "
      )

    spark.sparkContext.parallelize(date)
      .flatMap( line => {
        val arr = ArrayBuffer[(String,Int)]()
        line.split(" ").foreach( word =>  arr.+=( (word ,1) ) )
        arr
      } )
      //reduceByKey 是shuffle 算子,在此处设置分区
      .reduceByKey( new Spark06(4),  _+_  )
      //将结果保存在文件中, 才能查看分区效果
      .saveAsTextFile("g://result")

  }
}


上一篇下一篇

猜你喜欢

热点阅读