bug消消乐Spark

Spark Structured Streaming写Hive

2021-06-04  本文已影响0人  朝朝Mumu

组件版本

使用Spark Structured Streaming读取kafka的数据写入hive、HBase和MySQL在spark里没有原生支持,整理实测。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.insight.spark</groupId>
    <artifactId>SparkDemo</artifactId>
    <version>1.1</version>

    <properties>
        <encoding>UTF-8</encoding>
        <spark.version>2.3.1</spark.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/com.sun.jersey/jersey-core -->
        <dependency>
            <groupId>com.sun.jersey</groupId>
            <artifactId>jersey-client</artifactId>
            <version>1.19</version>
        </dependency>

        <!-- Spark核心库 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jersey-client</artifactId>
                    <groupId>org.glassfish.jersey.core</groupId>
                </exclusion>
            </exclusions>
            <!-- <scope>provided</scope>-->
        </dependency>
        <!--Spark sql库 提供DF类API -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--HBase相关库-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.htrace</groupId>
            <artifactId>htrace-core</artifactId>
            <version>3.1.0-incubating</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>
        
        <!--spark与hive交互 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>



    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <!-- If you have classpath issue like NoDefClassError,... -->
                    <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

主要代码与使用方法

Usage: StructuredKafkaWordCount <bootstrap-servers> <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1] ……
程序接收多个参数:第一个是kafka的broker地址,第二个是消费的topic名称、第三个是输出类型,有4种,用 0 1 2 3 表示,第4个是checkpoint的路径,后续更多的参数可以传递给连接mysql使用。程序的逻辑是接收kafka的消息,做wordcount处理后输出结果。

package com.insight.spark.streaming

import com.insight.spark.util.ConfigLoader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

import java.sql
import java.sql.{DriverManager, PreparedStatement}
import java.util.UUID

object StructuredStreamingTest {
  System.setProperty("HADOOP_USER_NAME","hdfs")
  val conf: Configuration = HBaseConfiguration.create()

  def main(args: Array[String]): Unit = {
    SetLogLevel.setStreamingLogLevels()
    if (args.length < 2) {
      System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        " <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1]")
      System.exit(1)
    }

    val Array(bootstrapServers, topics, number, _*) = args
    val checkpointLocation =
      if (args.length > 3) args(3) else "/tmp/temp-spark-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topics)
      .option("startingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    /**
      * Start running the query with user params:0 1 2 3
      * 1:结果写入hive
      * 2:结果写入hbase
      * 3:结果写入mysql
      * 0/other:console 结果打印到控制台
      */
    val dsw = number match {
      //写hive
      case "1" =>
        wordCounts.writeStream
          .outputMode("complete")
          .trigger(Trigger.ProcessingTime("10 seconds"))//批次时间
          .format("com.insight.spark.streaming.HiveSinkProvider")//自定义HiveSinkProvider
          .option("checkpointLocation", checkpointLocation)
          .queryName("write hive")

      case "2" =>
        wordCounts.writeStream
          .outputMode("update")
          .foreach(new ForeachWriter[Row] {
            var connection: Connection = _

            def open(partitionId: Long, version: Long): Boolean = {
              conf.set("hbase.zookeeper.quorum", ConfigLoader.getString("hbase.zookeeper.list"))
              conf.set("hbase.zookeeper.property.clientPort", ConfigLoader.getString("hbase.zookeeper.port"))
              conf.set("zookeeper.znode.parent", ConfigLoader.getString("zookeeper.znode.parent"))
              import org.apache.hadoop.hbase.client.ConnectionFactory
              connection = ConnectionFactory.createConnection(conf)
              true
            }

            def process(record: Row): Unit = {
              val tableName = TableName.valueOf(ConfigLoader.getString("hbase.table.name")) //表名
              val table = connection.getTable(tableName)
              val put = new Put(Bytes.toBytes(record.mkString))
              put.addColumn("info".getBytes(), Bytes.toBytes("word"), Bytes.toBytes(record(0).toString))
              put.addColumn("info".getBytes(), Bytes.toBytes("count"), Bytes.toBytes(record(1).toString))
              table.put(put)
            }

            def close(errorOrNull: Throwable): Unit = {
              connection.close()
            }
          })
          .option("checkpointLocation", checkpointLocation)
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .queryName("write hbase")

      case "3" =>

        /** 建表语句,先建个spark库
          * CREATE TABLE `words` (
          * `id` int(11) NOT NULL AUTO_INCREMENT,
          * `word` varchar(255) NOT NULL,
          * `count` int(11) DEFAULT 0,
          * PRIMARY KEY (`id`),
          * UNIQUE KEY `word` (`word`)
          * ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
          */
        val (url, user, pwd) = (args(4), args(5), args(6))
        wordCounts.writeStream
          .outputMode("complete")
          .foreach(new ForeachWriter[Row] {
            var conn: sql.Connection = _
            var p: PreparedStatement = _
            def open(partitionId: Long, version: Long): Boolean = {
              Class.forName("com.mysql.jdbc.Driver")
              conn = DriverManager.getConnection(url, user, pwd)
              p = conn.prepareStatement("replace into spark.words(word,count) values(?,?)")
              true
            }

            def process(record: Row): Unit = {
              p.setString(1, record(0).toString)
              p.setInt(2, record(1).toString.toInt)
              p.execute()
            }

            def close(errorOrNull: Throwable): Unit = {
              p.close()
              conn.close()
            }
          })
          .option("checkpointLocation", checkpointLocation)
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .queryName("write mysql")

      case _ =>
        wordCounts.writeStream
          .outputMode("update")
          .format("console")
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .option("checkpointLocation", checkpointLocation)
          .queryName("print it")

    }

    dsw.start().awaitTermination()

  }
}

HiveSinkProvider源码

其中用到的HiveSinkProvider代码如下:

package com.insight.spark.streaming

import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.slf4j.LoggerFactory


case class HiveSink(sqlContext: SQLContext,
                    parameters: Map[String, String],
                    partitionColumns: Seq[String],
                    outputMode: OutputMode) extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val logger = LoggerFactory.getLogger(this.getClass)

    val schema = StructType(Array(
      StructField("word", StringType),
      StructField("count", IntegerType)
    ))
    val res = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }
    // 转化df格式
    val df = data.sparkSession.createDataFrame(res, schema)
    df.write.mode(SaveMode.Append).format("hive").saveAsTable("words")

  }
}

class HiveSinkProvider extends StreamSinkProvider with DataSourceRegister {
  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
    HiveSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "HiveSinkProvider"
}

打包运行,spark-submit --xxx this.jar ...就可以了。

点:结构化流、Spark Structured Streaming、hive、hbase、mysql
线:spark
面:内存计算

上一篇 下一篇

猜你喜欢

热点阅读