spark

Spark 第一个scala 程序

2020-03-09  本文已影响0人  灬臣独秀灬

1、环境准备

1、JDK配置,Scala 配置 。目前教程环境用的是 hadopp2.6-CDH5.6.0、spark 2.1.0 、jdk 1.7u51、scala2.11.8 。
2、Scala 下载地址 https://www.scala-lang.org/download/all.html 迅雷下载速度更快(PS: 这不是打广告)。
3、 安装Scala 、JDK并配置环境变量(jdk 1.8 也是可以的、scala 要和spark 保持一致,因为可能我们会修改spark源码的需求)
4、IDEA scala 插件下载 。如果没有scala插件 ,IDEA 不能新建 scala 类 和对象 。(如果在IDEA 插件中心下载太慢了可以,查看版本后到 官网下载,或者用迅雷下载 官网: https://plugins.jetbrains.com/idea
5、测试数据存放百度网盘 https://pan.baidu.com/s/1xju3QodxC-abpWADifigyA 密码微信联系我

image

2、IDEA创建项目

1、打开IDE :file - new - project (搭建maven 项目并勾选 create from archetype ,并且选择 scala IDEA 自动帮你配置scala 大部分依赖省不少事)
image
2、 设置maven 坐标 GAV
image
3)、 配置项目maven 版本 设置以及本地仓库地址 (在spark 源码编译对maven 有要求,为了避免出现莫名问题 所以这里指定较高mean版本)
image image
4、 添加Scala 到IDEA 中
image.png
5、 第一个scala Object
image.png

3、编码阶段

1、业务需求,根据用户日志统计出各服务调用次数日志文件为csv 格式如下:


image.png
2、添加依赖
        <!-- Spark-SQL 依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <!--开源 ip 工具类 依赖根据IP计算出城市,已经安装到本地了, 需要的可以到github 上面找-->
        <dependency>
            <groupId>com.ggstar</groupId>
            <artifactId>ipdatabase</artifactId>
            <version>1.0</version>
        </dependency>
        <!-- ip 工具类 需要读取excel 中ip信息 依赖-->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.14</version>
        </dependency>
        <!-- ip 工具类依赖 需要读取excel-->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>3.14</version>
        </dependency>
        <!-- json工具类依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
3、程序代码
package cn.harsons.mbd

import java.util.Locale

import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  *
  * @author lyb
  * @date 2020/3/9 0009
  */
object UserLogStatApp {

  private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {
    // 拿到Spark Session  在本地开发时先设置local 模式发布正式在修改对应的模式
    val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
    // 注册函数,根据ip 获取城市  注册函数可以在Spark SQL 中使用  注意后面必须要使用 空格和下划线" _"
    val city = session.udf.register("getCity", getCity _)
    // 注册函数 时间转换
    val formatTime = session.udf.register("getTime", convertDate _)
    // 注册函数 分割URL 得到用户调用的模块
    val moduleType = session.udf.register("getModuleType", getModuleType _)
    // 以cvs 方式读取文件,cvs 分隔符为; (默认",") 从第一个参数里面读取 并且转成 DataFrame。
    val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
    // 这里面进行过滤, 清洗
    //  剔除第9列值不等于200 或者第四列为空
    // 查询 第一列的值 并且命名为 url 后面语法大体与SQL 类似
    // moduleType 为上面注册的方法主要是分割URL 得到调用的服务路径,city  formatTime  都一样原理
    val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
      .select(
        frame.col("_c1").as("url")
        , moduleType(frame.col("_c1")).as("server")
        , frame.col("_c0").as("ip")
        , city(frame.col("_c0")).as("address")
        , frame.col("_c3").as("userId")
        , frame.col("_c4").as("userName")
        , frame.col("_c5").as("browserName")
        , formatTime(frame.col("_c2")).as("time"))
    //  显示分组统计后的结果,这里可以把结果集输出到 HDFS 或者JDBC show 显示50行结果集
    result.groupBy("server").count().orderBy("count").show(50, false)
    //这里大体意思是 coalesce 指定文件大小,作用分区大小。 model  模式 指定为覆盖 partitionBy 根据什么分区
    // 这里用的是地址 和用户分区  输出文件为JSON
    // 这里也可以指定目录 在json 方法中
    result.coalesce(1).write.mode(SaveMode.Overwrite).partitionBy("address", "userName")
      .json(args(1))
    //关闭session
    session.stop()
  }

  def convertDate(date: String) = {
    format.format(TypeUtils.castToDate(date))
  }

  def getCity(ip: String) = {
    IpHelper.findRegionByIp(ip)
  }

  def getModuleType(url: String) = {
    if (url != null && url.contains("/")) {
      val str = url.replaceAll("//", "/")
      str.substring(str.indexOf("hscp/") + 5).split("/")(0)
    } else {
      ""
    }
  }

}

3、启动参数设置(IDEA 中可以通过program arguments 来设置 输入文件 和输出文件,注意参数之间用空格分开)
image.png
4、成功输出结果
image.png
5、可能出现的错误

1、Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.9-3
原因 调试发现这是由于默认的jackson-databind版本太高导致。

报错代码:


image.png

解决方案:

       <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.6</version>
        </dependency>

2、java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
由于window 下 没有配置好hadoop 环境变量导致。
解决方案:http://down2.opdown.com:8019/opdown/winutilsmaster.opdown.com.zip 配置好环境变量,重启计算机。

6、数据保存到MYSQL的代码如下(记得添加MYSQL驱动)
package cn.harsons.mbd

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Locale

import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.mutable.ListBuffer

/**
  *
  * @author liyabin
  * @date 2020/3/9 0009
  */
object UserStatSaveApp {

  private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {
    // 拿到Spark Session  在本地开发时先设置local 模式发布正式在修改对应的模式
    val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
    // 注册函数,根据ip 获取城市  注册函数可以在Spark SQL 中使用  注意后面必须要使用 空格和下划线" _"
    val city = session.udf.register("getCity", getCity _)
    // 注册函数 时间转换
    val formatTime = session.udf.register("getTime", convertDate _)
    // 注册函数 分割URL 得到用户调用的模块
    val moduleType = session.udf.register("getModuleType", getModuleType _)
    // 以cvs 方式读取文件,cvs 分隔符为; (默认",") 取程序传入的第一个参数当做文件地址读取 并且转成 DataFrame。
    val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
    // 这里面进行过滤, 清洗
    // 如果 剔除第9列值不等于200 或者第四列为空
    // 查询 第一列的值 并且命名为 url 后面语法大体与SQL 类似
    // moduleType 为上面注册的方法主要是分割URL 得到调用的服务路径,city  formatTime  都一样原理
    val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
      .select(
        frame.col("_c1").as("url")
        , moduleType(frame.col("_c1")).as("server")
        , frame.col("_c0").as("ip")
        , city(frame.col("_c0")).as("address")
        , frame.col("_c3").as("userId")
        , frame.col("_c4").as("userName")
        , frame.col("_c5").as("browserName")
        , formatTime(frame.col("_c2")).as("time"))
    // 保存统计结果
    readToServerStat(result.groupBy("server").count().orderBy("count"))
    //清洗后的数据写入数据库,也可以写入HDFS 以及任何HADOOP 支持的路径 。这次案例写入MYSQL数据库
    readToUserLog(result)
    //关闭session
    session.stop()
  }

  def readToServerStat(value: Dataset[Row]): Unit = {
    value.foreachPartition(partitionOfRecords => {
      val buffer = new ListBuffer[ServerBean]
      partitionOfRecords.foreach(row => {
        val server = row.getAs[String]("server")
        val count = row.getAs[Long]("count")
        buffer.append(ServerBean(server, count))
      })
      saveServerStat(buffer)
    })
  }

  def readToUserLog(value: Dataset[Row]): Unit = {
    value.foreachPartition(partitionOfRecords => {
      val buffer = new ListBuffer[UserLogBean]
      partitionOfRecords.foreach(row => {
        val url = row.getAs[String]("url")
        val address = row.getAs[String]("address")
        val server = row.getAs[String]("server")
        val ip = row.getAs[String]("ip")
        val userId = row.getAs[String]("userId")
        val userName = row.getAs[String]("userName")
        val browserName = row.getAs[String]("browserName")
        val time = row.getAs[String]("time")
        buffer.append(UserLogBean(server, url, ip, address, userId, userName, browserName, time))
      })
      saveUserLog(buffer)
    })
  }


  def saveServerStat(list: ListBuffer[ServerBean]) = {

    val connection = getConnection()
    connection.setAutoCommit(false)
    //todo
    val sql = "insert into server_stat(server,count) values (?,?) "
    val statement = connection.prepareStatement(sql)

    for (bean <- list) {
      statement.setString(1, bean.server)
      statement.setLong(2, bean.count)
      statement.addBatch()
    }
    statement.executeBatch() // 执行批量处理
    connection.commit() //手工提交
    release(connection, statement)
  }

  def saveUserLog(list: ListBuffer[UserLogBean]) = {
    val connection = getConnection()
    connection.setAutoCommit(false)
    val statement = connection.prepareStatement("insert into user_log(url,address,server,ip,userId,userName,browserName,time) values (?,?,?,?,?,?,?,?)")
    // todo
    for (bean <- list) {
      statement.setString(1, bean.url)
      statement.setString(2, bean.address)
      statement.setString(3, bean.server)
      statement.setString(4, bean.ip)
      statement.setString(5, bean.userId)
      statement.setString(6, bean.userName)
      statement.setString(7, bean.browserName)
      statement.setString(8, bean.time)
      statement.addBatch()
    }
    statement.executeBatch() // 执行批量处理
    connection.commit() //手工提交
    release(connection, statement)

  }

  /**
    * 转换日期
    *
    * @param date
    * @return
    */
  def convertDate(date: String) = {
    format.format(TypeUtils.castToDate(date))
  }

  /**
    * 根据ip计算出城市信息
    *
    * @param ip
    * @return
    */
  def getCity(ip: String) = {
    try {
      IpHelper.findRegionByIp(ip)
    } catch {
      case e: Exception => "未知"
    }
  }

  /**
    * 切割字符串
    *
    * @param url 用户请求路径
    * @return
    */
  def getModuleType(url: String) = {
    if (url != null && url.contains("/")) {
      val str = url.replaceAll("//", "/")
      str.substring(str.indexOf("hscp/") + 5).split("/")(0)
    } else {
      ""
    }
  }

  case class ServerBean(server: String, count: Long)

  case class UserLogBean(server: String, url: String, ip: String, address: String, userId: String
                         , userName: String, browserName: String, time: String)

  def getConnection() = {
    // 这里把数据库与地址写死,正式程序 可以改成配置式
    DriverManager.getConnection("jdbc:mysql://192.168.137.1:3306/spark?user=root&password=123456")
  }

  def release(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (connection != null) {
        connection.close()
      }
    }
  }

}

下一章-程序打包

上一篇下一篇

猜你喜欢

热点阅读