程序员小天地

spark-sortBy算子

2021-04-22  本文已影响0人  小草莓子桑

上一篇已经简单说过了map等几个算子。今天,我们来了解一下sortBy算子,他们也是Transformation算子

模拟一个场景,来讲各个Transformation的API

在这里我们模拟一个统计网站页面浏览情况的需求。

业务场景

我们这里使用网站浏览日志作为业务场景,就用简单的三个字段页面埋点id、用户id、停留时长


业务场景
日志格式

假定网站浏览日志以HDFS形式通过我们的系统埋点到我们数据仓库中了,埋点日志的数据格式如下:


日志格式

假定我们代码中已经使用这样的格式,把日志记录到服务器上,并通过flume上传到了HDFS服务器上

页面id|用户id|停留时长
index|2|6
表示:用户id为2的用户访问了index页面,并停留了6秒钟

这次,我们包装一个实体类,BrowserLogInfo,字段分别为url(用户访问页面),userId(用户id),time(停留时长),来映射我们日志中的三个字段,并通过map算子把日志文档读取写入到一个BrowserLogInfo对象的rdd中,直接上代码

- object not serializable (class: com.edu.spark.entity.BrowserLogInfo, value: BrowserLogInfo{url='index', userId=1, time=3})
    - element of array (index: 0)
    - array (class [Lcom.edu.spark.entity.BrowserLogInfo;, size 18)

如图所示:


object not serializable异常
package com.edu.spark.entity;

import scala.Serializable;

/**
 * 用户浏览日志实体
 * @author xiaocaomei
 * @date 2021/4/22
 * @description
 */
public class BrowserLogInfo implements Serializable {

    /**
     * 访问url
     */
    private String url;
    /**
     * 用户id
     */
    private long userId;
    /**
     * 浏览时长
     */
    private long time;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "BrowserLogInfo{" +
                "url='" + url + '\'' +
                ", userId=" + userId +
                ", time=" + time +
                '}';
    }
}
object LogProcess {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()

    /**
      * 如果这个参数不设置,默认认为你运行的是集群模式
      * 如果设置成local代表运行的是local模式
      */
    conf.setMaster("local")

    //设置任务名
    conf.setAppName("EduSpark")
    //创建SparkCore的程序入口
    val sc = new SparkContext(conf)
    //读取文件 生成RDD
    val file : RDD[String] = sc.textFile("F:\\hdfs\\hello.log")
    println(file.collect().toBuffer)

    //把每一行数据按照 | 分割
    val traceRdd : RDD[BrowserLogInfo] = file.map(s => {
      //把每一行数据按照 | 分割
      //注意这里 | 需要使用转义字符
      var sArray: Array[String] = s.split("\\|")
      //split函数的作用是 通过|分隔字符串返回数组
      val info = new BrowserLogInfo
      //  sArray 数组为 url | userId | time
      info.setUrl(sArray(0))
      // string.toLong 是用来把字符串转化成long
      info.setUserId(sArray(1).toLong)
      info.setTime(sArray(2).toLong)
      println(info)
      info
    })

    println(traceRdd.collect().toBuffer)
    sc.stop()
  }
}

下面接入正题,来看sortBy算子

直接上代码举栗子
object LogProcess {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()

    /**
      * 如果这个参数不设置,默认认为你运行的是集群模式
      * 如果设置成local代表运行的是local模式
      */
    conf.setMaster("local")

    //设置任务名
    conf.setAppName("EduSpark")
    //创建SparkCore的程序入口
    val sc = new SparkContext(conf)
    //读取文件 生成RDD
    val file : RDD[String] = sc.textFile("F:\\hdfs\\hello.log")
    println(file.collect().toBuffer)

    //把每一行数据按照 | 分割
    val traceRdd : RDD[BrowserLogInfo] = file.map(s => {
      //把每一行数据按照 | 分割
      //注意这里 | 需要使用转义字符
      var sArray: Array[String] = s.split("\\|")
      //split函数的作用是 通过|分隔字符串返回数组
      val info = new BrowserLogInfo
      //  sArray 数组为 url | userId | time
      info.setUrl(sArray(0))
      // string.toLong 是用来把字符串转化成long
      info.setUserId(sArray(1).toLong)
      info.setTime(sArray(2).toLong)
      println(info)
      info
    })

    println(traceRdd.collect().toBuffer)

    //按照对象BrowserLogInfo对象中的time字段进行升序排序
    //按照用户浏览时长 来升序 排序 浏览数据
    //第一个参数:函数,这里使用getTime获取time字段
    //第二个参数:ascending = true 升序,false降序
    val sortRdd = traceRdd.sortBy(x => x.getTime,ascending = true)

    println(sortRdd.collect().toBuffer)
    sc.stop()
  }
}
sortBy使用方式

使用代码如下:

 //按照对象BrowserLogInfo对象中的time字段进行升序排序
    //按照用户浏览时长 来升序 排序 浏览数据
    //第一个参数:函数,这里使用getTime获取time字段
    //第二个参数:ascending = true 升序,false降序
    val sortRdd = traceRdd.sortBy(x => x.getTime,ascending = true)

spark中的sortBy算子就简单给大家说到这里,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!

上一篇 下一篇

猜你喜欢

热点阅读