SparkStreaming创建DirectStream连接ka

2019-08-27  本文已影响0人  撸码小丑

SparkKafka直接流(createDirectStream)和kafka分区

每个kafka主题分区对应一个RDD分区。
spark可以通过spark.streaming.kafka.maxRatePerPartition
配置,对每个分区每秒接受的消息树进行控制。

LocationStrategies

位置策略,
控制特定的主题分区在哪个执行器上消费的。
在executor针对主题分区如何对消费者进行调度。
位置的选择是相对的,位置策略有三种方案:
1.PreferBrokers
    首选kafka服务器,只有在kafka服务器和executor位于同一主机,可以使用该中策略。

2.PreferConsistent
    首选一致性.
    多数时候采用该方式,在所有可用的执行器上均匀分配kakfa的主题的所有分区。
    综合利用集群的计算资源。

3.PreferFixed
    首选固定模式。
    如果负载不均衡,可以使用该中策略放置在特定节点使用指定的主题分区。手动控制方案。
    没有显式指定的分区仍然采用(2)方案。

ConsumerStrategy

ConsumerStrategies

消费者策略,是控制如何创建和配制消费者对象。
或者对kafka上的消息进行如何消费界定,比如t1主题的分区0和1,
或者消费特定分区上的特定消息段。
该类可扩展,自行实现。
1.ConsumerStrategies.Assign
    指定固定的分区集合,指定了特别详细的方范围。
    def Assign[K, V](
          topicPartitions: Iterable[TopicPartition],
          kafkaParams: collection.Map[String, Object],
          offsets: collection.Map[TopicPartition, Long])

2.ConsumerStrategies.Subscribe
    允许消费订阅固定的主题集合。

3.ConsumerStrategies.SubscribePattern 
    使用正则表达式指定感兴趣的主题集合。

消费者策略和语义模型代码示例

 import java.net.Socket

    
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import scala.collection.mutable.ArrayBuffer
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

    /**
      * Created by Administrator on 2018/3/8.
      */
    object SparkStreamingKafkaScala {

        def sendInfo(msg: String, objStr: String) = {
            //获取ip
            val ip = java.net.InetAddress.getLocalHost.getHostAddress
            //得到pid
            val rr = java.lang.management.ManagementFactory.getRuntimeMXBean();
            val pid = rr.getName().split("@")(0);
            //pid
            //线程
            val tname = Thread.currentThread().getName
            //对象id
            val sock = new java.net.Socket("s101", 8888)
            val out = sock.getOutputStream
            val m = ip + "\t:" + pid + "\t:" + tname + "\t:" + msg + "\t:" + objStr + "\r\n"
            out.write(m.getBytes)
            out.flush()
            out.close()
        }

        def main(args: Array[String]): Unit = {
            val conf = new SparkConf()
            conf.setAppName("kafka")
    //        conf.setMaster("spark://s101:7077")
            conf.setMaster("local[8]")

            val ssc = new StreamingContext(conf, Seconds(5))

            //kafka参数
            val kafkaParams = Map[String, Object](
                "bootstrap.servers" -> "s102:9092,s103:9092",
                "key.deserializer" -> classOf[StringDeserializer],
                "value.deserializer" -> classOf[StringDeserializer],
                "group.id" -> "g1",
                "auto.offset.reset" -> "latest",
                "enable.auto.commit" -> (false: java.lang.Boolean)
            )


            val map = scala.collection.mutable.Map[TopicPartition,String]()
            map.put(new TopicPartition("t1" , 0) , "s102")
            map.put(new TopicPartition("t1" , 1) , "s102")
            map.put(new TopicPartition("t1" , 2) , "s102")
            map.put(new TopicPartition("t1" , 3) , "s102")
            val locStra = LocationStrategies.PreferFixed(map) ;

            val consit = LocationStrategies.PreferConsistent

            val topics = Array("t1")

            //主题分区集合
            val tps = scala.collection.mutable.ArrayBuffer[TopicPartition]()
            tps.+=(new TopicPartition("t1" , 0))
    //        tps.+=(new TopicPartition("t2" , 1))
    //        tps.+=(new TopicPartition("t3" , 2))

            //偏移量集合
            val offsets = scala.collection.mutable.Map[TopicPartition,Long]()
            offsets.put(new TopicPartition("t1", 0), 3)
    //        offsets.put(new TopicPartition("t2", 1), 3)
    //        offsets.put(new TopicPartition("t3", 2), 0)

            val conss = ConsumerStrategies.Assign[String,String](tps , kafkaParams , offsets)



            //创建kakfa直向流
            val stream = KafkaUtils.createDirectStream[String,String](
                ssc,
                locStra,
                ConsumerStrategies.Assign[String, String](tps, kafkaParams, offsets)
            )

            val ds2 = stream.map(record => {
                val t = Thread.currentThread().getName
                val key = record.key()
                val value = record.value()
                val offset = record.offset()
                val par = record.partition()
                val topic = record.topic()
                val tt = ("k:"+key , "v:" + value , "o:" + offset, "p:" + par,"t:" + topic ,"T : " + t)
                //xxxx(tt) ;
                //sendInfo(tt.toString() ,this.toString)
                tt
            })

            ds2.print()

            ssc.start()

            ssc.awaitTermination()
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读