【spark开发】SparkStreaming+Kafka

2018-12-14  本文已影响0人  粮忆雨

SparkStreaming集成的Kafka API

Kafka项目在0.8和0.10版本之间引入了一个新的消费者API,因此有两个单独的对应的Spark流包可用。主要区别如下:

spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
Broker Version 0.8.2.1 or higher 0.10.0 or higher
API Maturity Deprecated Stable
Language Support Scala, Java, Python Scala, Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit API No Yes
Dynamic Topic Subscription No Yes

Kafka 0.10的Spark流集成在设计上类似于0.8 Direct Stream方法。它提供了简单的并行性,Kafka分区和Spark分区之间1:1的通信,以及对偏移量和元数据的访问。但是,由于新的集成使用new Kafka consumer API,而不是 simple consumer API,所以在使用上有显著的差异。目前集成的这个版本被标记为实验性的,因此API可能会发生变化。spark2.3+ 推荐使用kafka10 API。

下面将基于两种API的开发Direct DStream demo示例。

Q1:为何使用Direct DStream模式?
Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。在默认配置下, 这种情况可能会在故障下丢失数据(即当Driver进程挂掉后,Driver下的Executor都会被杀掉,当更新完zookeeper消费偏移量的时候,Driver如果挂掉了,就会存在找不到数据的问题)。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。效率会有所降低。
Direct方式就是将kafka看成存数据的一方,不是被动接收数据,而是主动去取数据。周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的consumer api来获取Kafka指定offset范围的数据。需要手动更新offsets。

Q2:Direct模式并行度?
Direct模式的并行度是由读取的kafka中topic的partition数决定。

更多kafka基础概念:【kafka-基础】kafka基础概念及应用
集群环境搭建安装:【kafka-部署】集群搭建&快速开始

生产者模拟产生电表数据,代码如下:

import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer


/**
  * kafka生产者
  */
object KafkaProducer {
    //kafka节点
    def BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"

    def TOPIC = "topicA"

    def isAsync = false

    def main(args: Array[String]): Unit = {

        val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST)
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer")
        val producer = new KafkaProducer[Int, String](props)
        try {
            //模拟三个电表产生电量数据 active_quan:电量,create_time:电量产生时间,meter_id:电表id
            while (true) {
                val cur_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)
                val arr = Array("000a3804-0315-4de4-a846-74ac37812d08", "010892ae-9d99-42cf-aab7-6d074351b15a", "01e5a01c-661d-4b65-82ee-1309497b79e7")
                for (meter_id <- arr) {
                    val msg = "{\"active_quan\":" + ((new util.Random).nextInt(100) + 1) + ",\"create_time\":\"" + cur_time + "\",\"meter_id\":\"" + meter_id + "\"}"
                    val startTime = System.currentTimeMillis()
                    if (isAsync) {
                        // Send asynchronously
                        producer.send(new ProducerRecord(TOPIC, msg), new DemoCallback(startTime, msg))
                    } else {
                        // Send synchronously
                        producer.send(new ProducerRecord(TOPIC,msg))
                        System.out.println("Sent message: (" + msg + ")")
                    }
                }
                Thread.sleep(30000)
            }
        } catch {
            case ex: Exception => {
                println(ex)
            }
        } finally {
            producer.close
        }
    }
}


/**
  * kafka回调类
  * @param startTime
  * @param message
  */
class DemoCallback(startTime : Long, message : String) extends Callback{

    override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
        val elapsedTime = System.currentTimeMillis() - startTime
        if (metadata != null) {
            System.out.println(
                "message => (" + message + ") sent to partition(" + metadata.partition() +
                        "), " +
                        "offset(" + metadata.offset() + ") in " + elapsedTime + " ms")
        } else {
            e.printStackTrace()
        }
    }
}

maven依赖,0.8切换0.10只需修改spark-streaming-kafka-0-8_${scala.version}spark-streaming-kafka-0-10_${scala.version}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huidian.spark</groupId>
    <artifactId>slpark-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.3.1</spark.version>
        <scala.version>2.11</scala.version>
        <hadoop.version>2.7.2</hadoop.version>
        <hbase.version>1.1.1</hbase.version>
        <mysql.version>5.1.40</mysql.version>
        <c3p0.version>0.9.1.2</c3p0.version>
    </properties>

    <dependencies>
        <!--spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!--hbase-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!--MySQL数据库连接-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>${c3p0.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

spark-streaming-kafka-0-8示例

scala实现

spark-streaming消费kafka主类

import com.hdc.spark.streaming.utils.KafkaManager
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges}

/**
  * spark-streaming读取kafka
  * 模拟消费电表读表数据
  */
object DirectKafkaMeterData {


    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(60))
        //kafka节点
        val BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
        val ZK_SERVERS = "hdc-data4:2181,hdc-data5:2181,hdc-data6:2181"
        val GROUP_ID = "meterdata_group" //消费者组

        val topics = Set("topicA") //待消费topic

        /*
        参数说明
        AUTO_OFFSET_RESET_CONFIG
            smallest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            largest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
            disable:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
         */
        val kafkaParams = Map[String, String](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
            ConsumerConfig.GROUP_ID_CONFIG -> GROUP_ID,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "smallest"
        )
        val kafkaManager = new KafkaManager(kafkaParams)
        //创建数据流
        val kafkaStream: InputDStream[(String, String)] = kafkaManager.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,topics)

        kafkaStream.foreachRDD { rdd =>
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            rdd.map(msg => msg._2).foreachPartition(ite=>{
                ite.foreach(record => {
                    //处理数据的方法
                    println(record)
                })
            })
            kafkaManager.updateZKOffsets(offsetRanges)
        }
        ssc.start()
        ssc.awaitTermination()
    }

}

Kafka offset管理类,使用zookeeper维护offset。除以下使用集成的kafka API去维护还可以使用zk client API去实现。

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{ KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset

import scala.reflect.ClassTag


class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {

    private val kc = new KafkaCluster(kafkaParams)

    /**
      * 创建数据流
      *
      * @param ssc
      * @param topics
      * @tparam K
      * @tparam V
      * @tparam KD
      * @tparam VD
      * @return
      */
    def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
          ssc: StreamingContext,
          topics: Set[String]): InputDStream[(K, V)] = {
        val groupId = kafkaParams.get("group.id").get
        // 在zookeeper上读取offsets前先根据实际情况更新offsets
        setOrUpdateOffsets(topics, groupId)
        //从zookeeper上读取offset开始消费message
        val kafkaStream = {
            val partitionsE = kc.getPartitions(topics)
            if (partitionsE.isLeft)
                throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
            val partitions = partitionsE.right.get
            val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
            if (consumerOffsetsE.isLeft)
                throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
            val consumerOffsets = consumerOffsetsE.right.get
            println(consumerOffsets)
            KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
                ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
        }
        kafkaStream
    }

    /**
      * 创建数据流前,根据实际消费情况更新消费offsets
      * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
      * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
      * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
      * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
      * 这时把consumerOffsets更新为earliestLeaderOffsets
      *
      * @param topics
      * @param groupId
      */
    private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
        topics.foreach(topic => {
            var hasConsumed = true
            val partitionsE = kc.getPartitions(Set(topic))
            if (partitionsE.isLeft)
                throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
            val partitions = partitionsE.right.get
            val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
            if (consumerOffsetsE.isLeft) hasConsumed = false
            if (hasConsumed) {
                // 消费过
                val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
                if (earliestLeaderOffsetsE.isLeft)
                    throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
                val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
                val consumerOffsets = consumerOffsetsE.right.get

                // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
                var offsets: Map[TopicAndPartition, Long] = Map()
                consumerOffsets.foreach({ case (tp, n) =>
                    val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
                    if (n < earliestLeaderOffset) {
                        println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
                                " offsets已经过时,更新为" + earliestLeaderOffset)
                        offsets += (tp -> earliestLeaderOffset)
                    }
                })
                if (!offsets.isEmpty) {
                    kc.setConsumerOffsets(groupId, offsets)
                }
            } else {
                // 首次消费
                println( groupId+" 第一次消费 Topic:" + topics)
                val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
                var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
                if (reset == Some("smallest")) {
                    val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
                    if (leaderOffsetsE.isLeft)
                        throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
                    leaderOffsets = leaderOffsetsE.right.get
                } else {
                    val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
                    if (leaderOffsetsE.isLeft)
                        throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
                    leaderOffsets = leaderOffsetsE.right.get
                }
                val offsets = leaderOffsets.map {
                    case (tp, offset) => (tp, offset.offset)
                }
                kc.setConsumerOffsets(groupId, offsets)
            }
        })
    }

    /**
      * 更新zookeeper上的消费offsets
      *
      * @param offsetRanges
      */
    def updateZKOffsets(offsetRanges: Array[OffsetRange]): Unit = {
        val groupId = kafkaParams.get("group.id").get
        for (offsets <- offsetRanges) {
            val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
            val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
            if (o.isLeft) {
                println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
            }
        }
    }

}

spark-streaming-kafka-0-10示例

新版本的kafka 消费offset默认存在_consumer_offset的topic中。手动维护offset值需设置enable.auto.commit=false,如果为true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理造成数据丢失。

way1:(官方推荐)
默认消费偏移量存储于_consumer_offsets主题中,创建数据流时若不设置offsets则默认获取_consumer_offsets中相应消费组的消费偏移量量进行消费。若无相应消费偏移量则按照auto.offset.reset设置的消费策略进行消费。手动提交/更新消费偏移量使用stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)。实例代码如下:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * spark-streaming读取kafka
  */
object DirectKafkaMeterData {

    //kafka节点
    def BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaExample")
        val ssc = new StreamingContext(conf, Seconds(5))//流数据分批处理时间间隔

        /*
        参数说明
        AUTO_OFFSET_RESET_CONFIG
            earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
            none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
         */
        val kafkaParams = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
            ConsumerConfig.GROUP_ID_CONFIG -> "kafka_group",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
        )

        val topics = Array("topicA")
        //创建数据流
        val stream = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )
        //处理流数据
        stream.foreachRDD { rdd =>
            //手动维护偏移量
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

            rdd.foreachPartition { iter =>
                    iter.foreach(line => {
                        println(line.value())
                    })
                val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
            }
            // 在输出(outputs)完成一段时间之后,将偏移量同步更新到kafka
            stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        }
        ssc.start()
        ssc.awaitTermination()
    }

way2:将偏移量存储于zookeeper
spark-streaming消费kafka主类

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * spark-streaming读取kafka
  * 适用本版:spark-streaming-kafka-0-10
  * (0.10和0.8的API有较大的区别)
  */
object DirectKafkaManagerMeterData {

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local").setAppName("DirectKafkaMeterData")
        val ssc = new StreamingContext(conf, Seconds(30))//流数据分批处理时间间隔
        //kafka节点
        val BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
        val ZK_SERVERS = "hdc-data4:2181,hdc-data5:2181,hdc-data6:2181"
        val GROUP_ID = "meterdata_group" //消费者组

        val topics = Array("topicA") //待消费topic

        /*
        参数说明
        AUTO_OFFSET_RESET_CONFIG
            earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
            none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
         */
        val kafkaParams = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
            ConsumerConfig.GROUP_ID_CONFIG -> GROUP_ID,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
        )

        //采用zookeeper手动维护偏移量
        val zkManager = new KafkaOffsetZKManager(ZK_SERVERS)
        val fromOffsets = zkManager.getFromOffset(topics,GROUP_ID)
        //创建数据流
        var stream:InputDStream[ConsumerRecord[String, String]] = null
        if (fromOffsets.size > 0){
            stream = KafkaUtils.createDirectStream[String, String](
                ssc,
                PreferConsistent,
                Subscribe[String, String](topics, kafkaParams, fromOffsets)
            )
        }else{
            stream = KafkaUtils.createDirectStream[String, String](
                ssc,
                PreferConsistent,
                Subscribe[String, String](topics, kafkaParams)
            )
            println("第一次消费 Topic:" + topics)
        }

        //处理流数据
        stream.foreachRDD { rdd =>
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            val rs = rdd.map(record => (record.offset(), record.partition(), record.value())).collect()
            for(item <- rs) println(item)
            // 处理数据存储到HDFS或Hbase等
            // 存储代码(略)
            // 处理完数据保存/更新偏移量
            zkManager.storeOffsets(offsetRanges,GROUP_ID)
        }

        ssc.start()
        ssc.awaitTermination()
    }

}

kafka偏移量zookeeper维护类

import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange

/**
  * kafka偏移量zookeeper维护类
  * 适用本版:spark-streaming-kafka-0-10
  *
  * @param zkServers zookeeper server
  */
class KafkaOffsetZKManager(zkServers : String) {

    //创建zookeeper连接客户端
    val zkClient = {
        val client = CuratorFrameworkFactory
                .builder
                .connectString(zkServers)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
//                .namespace("kafka")//创建包含隔离命名空间的会话
                .build()
        client.start()
        client
    }

    val _base_path_of_kafka_offset = "/kafka/offsets" //offset 路径起始位置


    /**
      * 获取消费者组topic已消费偏移量(即本次起始偏移量)
      * @param topics topic集合
      * @param groupName 消费者组
      * @return
      */
    def getFromOffset(topics: Array[String], groupName:String):Map[TopicPartition, Long] = {
        // Kafka 0.8和0.10的版本差别:0.10->TopicPartition ,0.8->TopicAndPartition
        var fromOffset: Map[TopicPartition, Long] = Map()
        for(topic <- topics){
            val topic = topics(0).toString
            // 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream
            val zkTopicPath = s"${_base_path_of_kafka_offset}/${groupName}/${topic}"
            // 检查路径是否存在
            checkZKPathExists(zkTopicPath)
            // 获取topic的子节点,即 分区
            val childrens = zkClient.getChildren().forPath(zkTopicPath)
            // 遍历分区
            import scala.collection.JavaConversions._
            for (p <- childrens){
                // 遍历读取子节点中的数据:即 offset
                val offsetData = zkClient.getData().forPath(s"$zkTopicPath/$p")
                // 将offset转为Long
                val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
                fromOffset += (new TopicPartition(topic, Integer.parseInt(p)) -> offSet)
            }
        }
        println(fromOffset)
        fromOffset
    }

    /**
      * 检查ZK中路径存在,不存在则创建该路径
      * @param path
      * @return
      */
    def checkZKPathExists(path: String)={
        if (zkClient.checkExists().forPath(path) == null) {
            zkClient.create().creatingParentsIfNeeded().forPath(path)
        }
    }

    /**
      * 保存或更新偏移量
      * @param offsetRange
      * @param groupName
      */
    def storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {
        for (o <- offsetRange){
            val zkPath = s"${_base_path_of_kafka_offset}/${groupName}/${o.topic}/${o.partition}"
            // 检查路径是否存在
            checkZKPathExists(zkPath)
            // 向对应分区第一次写入或者更新Offset 信息
            println("---Offset写入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)
            zkClient.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
        }
    }

}
上一篇 下一篇

猜你喜欢

热点阅读