Spark SQL UDF/Join/DataFrame综合使用

2019-06-20  本文已影响0人  喵星人ZC

一、Scalikejdbc的配置文件及pom文件如下
application.conf

db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop000:3306/hadoop_train?characterEncoding=utf-8"
db.default.user="root"
db.default.password="root"
dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource

pom.xml

 <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.2</spark.version>
        <hive.version>1.1.0-cdh5.7.0</hive.version>
        <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
        <scalikejdbc.version>3.3.2</scalikejdbc.version>
    </properties>

 <!--scalikejdbc 依赖 -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc_2.11</artifactId>
            <version>${scalikejdbc.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc-config_2.11</artifactId>
            <version>${scalikejdbc.version}</version>
        </dependency>
 <!--Spark 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--mysql 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>
 <!-- GSON -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3.1</version>
        </dependency>
   </dependencies>

二、数据及脚本
MySQL两张表信息如下
city_info

CREATE TABLE `city_info` (
  `city_id` int(11) DEFAULT NULL,
  `city_name` varchar(255) DEFAULT NULL,
  `area` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 

insert  into `city_info`(`city_id`,`city_name`,`area`) values (1,'BEIJING','NC'),(2,'SHANGHAI','EC'),(3,'NANJING','EC'),(4,'GUANGZHOU','SC'),(5,'SANYA','SC'),(6,'WUHAN','CC'),(7,'CHANGSHA','CC'),(8,'XIAN','NW'),(9,'CHENGDU','SW'),(10,'HAERBIN','NE');

product_info

CREATE TABLE IF NOT EXISTS product_info(
product_id INT UNSIGNED AUTO_INCREMENT,
product_name VARCHAR(100),
extend_info VARCHAR(100),
PRIMARY KEY (product_id )
);

insert  into product_info(product_id,product_name,extend_info) values (1,'product1','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (2,'product2','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (3,'product3','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (4,'product4','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (5,'product5','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (6,'product6','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (7,'product7','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (8,'product8','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (9,'product9','{"product_status":0}');
insert  into product_info(product_id,product_name,extend_info) values (10,'product10','{"product_status":1}');
insert  into product_info(product_id,product_name,extend_info) values (11,'product11','{"product_status":0}');
insert  into product_info(product_id,product_name,extend_info) values (12,'product12','{"product_status":0}');
insert  into product_info(product_id,product_name,extend_info) values (13,'product13','{"product_status":0}');

Hive表信息

create table user_click(
user_id int,
session_id string,
action_time string,
city_id int,
product_id int
) partitioned by (day string)
row format delimited fields terminated by ',';

load data local inpath '/home/hadoop/soul/data/user_click.txt' overwrite into table user_click partition(day='2016-05-05');

数据格式
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:01:56�,1,72

三、使用Spark SQL DataFrame API统计分析得到
product_id,product_name,product_status,area, click_count,rank,day 信息

import java.io.File
import com.google.gson.{JsonObject, JsonParser}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Row, SparkSession, functions}
import org.apache.spark.sql.functions._
import scalikejdbc.config.DBs
import scalikejdbc.{DB, SQL}

import scala.collection.mutable.ListBuffer

/**
  * 两张MySQL表 一张HIve表进行Join 使用DataFrame API得到
  *
  * product_id,product_name,product_status,area, click_count,rank,day 信息
  */
object SparkSQLJoinApp {
  def main(args: Array[String]): Unit = {
    val warehouseLocation = new File("spark-warehouse").getAbsolutePath

    val spark = SparkSession.builder()
      .appName("SparkSQLJoinApp")
      .master("local[2]")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport().getOrCreate()
    val format = "yyyy-MM-dd'T'HH:mm:ssz"
    import spark.implicits._

    /**
      * userclickDF  HiveUDF
      */
    import spark.sql
    sql("use g6_hadoop")
    //    spark.table("user_click").show(10,false)
    val userclickDF = sql("select user_id,city_id,product_id,day from user_click")


    val dbURL = "jdbc:mysql://hadoop000:3306/hadoop_train"
    val dbUSerName = "root"
    val dbPasswd = "root"


    /**
      * cityDF  MySQLUDF 1
      */
    val cityDF = spark.read.format("jdbc").option("url", dbURL)
      .option("dbtable", "city_info")
      .option("user", dbUSerName)
      .option("password", dbPasswd).load()


    //TODO 实现UDF函数
    val splitUDF = udf((extend_info: String) => {
      val obj = new JsonParser().parse(extend_info).getAsJsonObject
      val ele = obj.get("product_status")
      ele.toString.toInt
    })
    spark.udf.register("splitUDF", splitUDF)


    /**
      * productDF  MySQLUDF 2
      */
    val product_table = spark.read.format("jdbc").option("url", dbURL)
      .option("dbtable", "product_info")
      .option("user", dbUSerName)
      .option("password", dbPasswd).load().createOrReplaceTempView("product_info")

    val productDF = spark.sqlContext.sql("select product_id,product_name,splitUDF(extend_info) as product_status  from product_info")



    //TODO 商品信息的各区域的访问次数
    /**
      *
      * select
      * product_id, area, count(1) click_count
      * from
      * tmp_product_click_basic_info
      * group by
      * product_id, area
      *
      */
    val productTempDF = userclickDF.join(cityDF, "city_id").select("product_id", "city_id", "city_name", "area", "day")

    val pro_area_countDF = productTempDF.groupBy("product_id", "area").count().join(productDF, "product_id")
      .select($"product_id", $"product_name", $"product_status", $"area", $"count".as("click_count"))

    //    pro_area_countDF.show(false)

    classOf[com.mysql.jdbc.Driver]
    DBs.setup()

    //将商品信息的各区域的访问次数 计算结果保存到MySQL
    pro_area_countDF.foreachPartition(partitionOfRecords => {

      val list = ListBuffer[area_prod_info]()
      partitionOfRecords.foreach(record => {

        val product_id = record.getAs[Int]("product_id")
        val product_name = record.getAs[String]("product_name")
        val product_status = record.getAs[Int]("product_status")
        val area = record.getAs[String]("area")
        val click_count = record.getAs[Long]("click_count")
        list.append(area_prod_info(product_id, product_name, product_status, area, click_count))

        //插入前先删除已有数据
        deleteByID(product_id)
      })


      insert(list)
    })

    DBs.close()

    //TODO 获取区域点击top3的商品。使用窗口函数row_number() over()
    //
    val window_spec = Window.partitionBy("area").orderBy($"click_count".desc)

    val rankDF = pro_area_countDF.select(pro_area_countDF("product_id")
      , pro_area_countDF("product_name"),
      pro_area_countDF("product_status"),
      pro_area_countDF("area"),
      pro_area_countDF("click_count"),
      row_number().over(window_spec).as("rank")).where("rank <=3")

    rankDF.show(100, false) //TODO 保存数据库


    spark.stop()
  }

  def insert(area_prod_infos: ListBuffer[area_prod_info]): Unit = {
    DB.localTx { implicit session =>
      for (area_prod_info <- area_prod_infos) {
        SQL("insert into area_product_click_count_full_info(product_id,product_name,product_status,area,click_count) values(?,?,?,?,?)")
          .bind(area_prod_info.product_id, area_prod_info.product_name, area_prod_info.product_status, area_prod_info.area, area_prod_info.click_count)
          .update().apply()
      }
    }
  }


  def deleteByID(id: Int): Unit = {
    DB.localTx {
      implicit session =>
        SQL(s"delete from area_product_click_count_full_info where product_id = ${id}").update().apply()
    }

  }

  case class area_prod_info(product_id: Int, product_name: String, product_status: Int, area: String, click_count: Long)

}

上一篇下一篇

猜你喜欢

热点阅读