Spark 与hive集成 并且读取mysql数据

2021-07-08  本文已影响0人  wudl

1.Spark 和hive de 集成

1. 构建SparkSessiond对象
2. 与hive 集成的配置
3. 

2.maven 的依赖环境配置

   <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

3. 代码集成

package cn.wudl.tags.models.rule

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object JobModel {

  def main(args: Array[String]): Unit = {


     // 创建sparkSession 的实例对象
    val spark :SparkSession = {
      //  创建sparkSession
    val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        // 设置shuffle 的分区数目
      .set("spark.sql.shuffle.partitions", "4")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result], classOf[Put]))

    // 2. 用构造者模式 构建SparkSession
    val session = SparkSession.builder().config(sparkConf)
      //与hive 集成
      .enableHiveSupport()
      // 设置与hive 集成
      .config("hive.metastore.uris", "thrift://192.168.1.140:9083")
      // 设置hive 的数仓目录
      .config("spark.sql.warehouse.dir", "hdfs://192.168.1.140:8020/user/hive/warehouse")
      .getOrCreate()
    // c. 返回会话对象
      session
    }
    import org.apache.spark.sql.functions._
    import spark.implicits._

    val tagTable:String =
      """
        |(
        |SELECT id, name, rule, level  FROM profile_tags.tbl_basic_tag WHERE id = 321
        |union
        |SELECT id, name, rule, level  FROM profile_tags.tbl_basic_tag WHERE pid = 321
        |) as tag_table
        |""".stripMargin

    val baseTagDF = spark.read.format("jdbc")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://192.168.1.140:3306/?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC")
      .option("dbtable", tagTable)
      .option("user", "root")
      .option("password", "123456")
      .load()
      //  打印df 的数据格式
    baseTagDF.printSchema();
    baseTagDF.show(1000,truncate = false)
    spark.stop()
  }

}

4. 执行结果

21/07/08 21:58:41 INFO SessionState: Created HDFS directory: /tmp/hive/Administrator/2dc03761-0ead-4df3-a025-7548bde33ca3/_tmp_space.db
21/07/08 21:58:41 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is hdfs://192.168.1.140:8020/user/hive/warehouse
21/07/08 21:58:41 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- rule: string (nullable = true)
 |-- level: integer (nullable = true)

21/07/08 21:58:42 INFO CodeGenerator: Code generated in 129.7142 ms
21/07/08 21:58:42 INFO SparkContext: Starting job: show at JobModel.scala:54
21/07/08 21:58:42 INFO DAGScheduler: Got job 0 (show at JobModel.scala:54) with 1 output partitions
21/07/08 21:58:42 INFO DAGScheduler: Final stage: ResultStage 0 (show at JobModel.scala:54)
21/07/08 21:58:42 INFO DAGScheduler: Parents of final stage: List()
21/07/08 21:58:42 INFO DAGScheduler: Missing parents: List()
21/07/08 21:58:42 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at show at JobModel.scala:54), which has no missing parents
21/07/08 21:58:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/08 21:58:43 INFO JDBCRDD: closed connection
21/07/08 21:58:43 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1475 bytes result sent to driver
21/07/08 21:58:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 97 ms on localhost (executor driver) (1/1)
21/07/08 21:58:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/08 21:58:43 INFO DAGScheduler: ResultStage 0 (show at JobModel.scala:54) finished in 0.110 s
21/07/08 21:58:43 INFO DAGScheduler: Job 0 finished: show at JobModel.scala:54, took 0.380809 s
21/07/08 21:58:43 INFO CodeGenerator: Code generated in 11.4216 ms
+---+----+-----------------------------------------------------------------------------------------------------------------------+-----+
|id |name|rule                                                                                                                   |level|
+---+----+-----------------------------------------------------------------------------------------------------------------------+-----+
|321|职业  |inType=hbase
zkHosts=192.168.1.140
zkPort=2181
hbaseTable=tbl_tag_users
family=detail
selectFieldNames=id,job|4    |
|322|学生  |1                                                                                                                      |5    |
|323|公务员 |2                                                                                                                      |5    |
|324|军人  |3                                                                                                                      |5    |
|325|警察  |4                                                                                                                      |5    |
|326|教师  |5                                                                                                                      |5    |
|327|白领  |6                                                                                                                      |5    |
+---+----+-----------------------------------------------------------------------------------------------------------------------+-----+

21/07/08 21:58:43 INFO SparkUI: Stopped Spark web UI at http://192.168.1.1:4040
21/07/08 21:58:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/08 21:58:43 INFO MemoryStore: MemoryStore cleared
21/07/08 21:58:43 INFO BlockManager: BlockManager stopped
21/07/08 21:58:43 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/08 21:58:43 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/08 21:58:43 INFO SparkContext: Successfully stopped SparkContext
21/07/08 21:58:43 INFO ShutdownHookManager: Shutdown hook called
21/07/08 21:58:43 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-ecfaf63a-8cfb-4c16-a300-ae47c03ca5b7
Disconnected from the target VM, address: '127.0.0.1:50966', transport: 'socket'


上一篇下一篇

猜你喜欢

热点阅读