Spark

Sparkstreaming数据零丢失之手动维护offset到M

2019-06-06  本文已影响0人  喵星人ZC

版本信息:

spark:2.2.0
kakfa:0.10.1.0
scala:2.11.8
scalikejdbc:3.3.2

Pom文件:

<properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <scalikejdbc.version>3.3.2</scalikejdbc.version>
</properties>

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

      <!--scalikejdbc 依赖 -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc_2.11</artifactId>
            <version>${scalikejdbc.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc-config_2.11</artifactId>
            <version>${scalikejdbc.version}</version>
        </dependency>
       <!--Spark 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

         <!--mysql 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>
</dependencies>

application.conf文件

db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop000:3306/hadoop_train?characterEncoding=utf-8"
db.default.user="root"
db.default.password="root"
dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource


#Kafka信息
metadata.broker.list = "192.168.245.100:9092"
#从老数据开始消费
auto.offset.reset = "smallest"
group.id = "baidu_offset_group"
kafka.topics = "baidu"
serializer.class = "kafka.serializer.StringEncoder"
request.required.acks = "1"

ValueUtils

package com.soul.bigdata.spark.streaming01

import com.typesafe.config.ConfigFactory
import org.apache.commons.lang3.StringUtils

object ValueUtils {
  val load = ConfigFactory.load()

  def getStringValue(key: String, defaultValue: String = "") = {
    val value = load.getString(key)
    if (StringUtils.isNotEmpty(value)) {
      value
    } else {
      defaultValue
    }
  }
}

MySQL Offset表

 create table baidu_offset(
        topic varchar(32),
        groupid varchar(50),
        partitions int,
        fromoffset bigint,
        untiloffset bigint,
        primary key(topic,groupid,partitions)
        );

代码:

package com.soul.bigdata.spark.streaming01


import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs

object StreamingOffsetMySQL {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingOffsetMySQL")

    val ssc = new StreamingContext(conf, Seconds(10))

    //Topic
    val topics = ValueUtils.getStringValue("kafka.topics").split(",").toSet

    //kafka参数
    //这里应用了自定义的ValueUtils工具类,来获取application.conf里的参数,方便后期修改
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> ValueUtils.getStringValue("metadata.broker.list"),
      "auto.offset.reset" -> ValueUtils.getStringValue("auto.offset.reset"),
      "group.id" -> ValueUtils.getStringValue("group.id")
    )


    //先使用scalikejdbc从MySQL数据库中读取offset信息
    //+------------+------------------+------------+------------+-------------+
    //| topic      | groupid          | partitions | fromoffset | untiloffset |
    //+------------+------------------+------------+------------+-------------+
    //MySQL表结构如上,将“topic”,“partitions”,“untiloffset”列读取出来
    //组成 fromOffsets: Map[TopicAndPartition, Long],后面createDirectStream用到


    DBs.setup()
    val fromOffset = DB.readOnly(implicit session => {
      SQL("select * from baidu_offset").map(rs => {
        (TopicAndPartition(rs.string("topic"), rs.int("partitions")), rs.long("untiloffset"))
      }).list().apply()
    }).toMap


    //如果MySQL表中没有offset信息,就从0开始消费;如果有,就从已经存在的offset开始消费
    val messages = if (fromOffset.isEmpty) {
      println("从头开始消费...")
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    } else {
      println("从已存在记录开始消费...")
      val messageHandler = (mm: MessageAndMetadata[String, String]) => (mm.key(), mm.message())
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffset, messageHandler)
    }


    messages.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        //输出rdd的数据量
        println("数据统计记录为:" + rdd.count())
        //官方案例给出的获得rdd offset信息的方法,offsetRanges是由一系列offsetRange组成的数组
        //          trait HasOffsetRanges {
        //            def offsetRanges: Array[OffsetRange]
        //          }
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetRanges.foreach(x => {
          //输出每次消费的主题,分区,开始偏移量和结束偏移量
          println(s"---${x.topic},${x.partition},${x.fromOffset},${x.untilOffset}---")
          //将最新的偏移量信息保存到MySQL表中
          DB.autoCommit(implicit session => {
            SQL("replace into baidu_offset(topic,groupid,partitions,fromoffset,untiloffset) values (?,?,?,?,?)")
              .bind(x.topic, ValueUtils.getStringValue("group.id"), x.partition, x.fromOffset, x.untilOffset)
              .update().apply()
          })
        })
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

运行


image.png

停掉程序,重新运行,开始offset是从411开始消费的就达到了我们的目的


image.png image.png image.png
上一篇 下一篇

猜你喜欢

热点阅读