SparkStreaming-Receivers

2018-11-09  本文已影响0人  机器不能学习

官网翻译学习所得,译句不当还请指教。

spark Streaming可以接受从任意已经支持的数据源获取数据,比如kafka,flume,kinesis,files,sockets等等。也允许开发者去实现一个定制的receiver,从其他数据源处获取数据。

当我们实现一个Receiver的时候,这个custom receiver必须继承

onStart(): Things to do to start receiving data.
onStop(): Things to do to stop receiving data.

这两个方法。
这两个方法不会无限期的阻塞。通常情况下,onStart()会开始接受数据的线程,onStop()会确保接收到数据的线程停止。同时也提供了一个isStopped()函数,去检验这个数据接受线程是否停止接收数据了。

收到数据之后可以调用store(data)把数据存储在spark里面(这个方法也是Recerver提供的)。这里有许多种类别的store()方法,它允许存储某一时刻接受到的数据或者是整个序列号的集合/对象。值得注意的是,store方法的实现会影响可靠性和默认的容错语义。

在receiving线程中任何的例外都会被正确的caught和handled,避免了receiver无声的失败。restart()方法将会通过异步的方式去调用onStop()方法,并且一段时间之后调用onStart()方法。stop()将会调用onStop()方法并且终止receriver。reportError()将会打印出错误信息给driver,可以在logs和UI找到这些错误信息(但是不包含stopping和restarting的信息)。

scala版本 for SparkStreaming

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.streaming

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

/**
 * Custom Receiver that receives data over a socket. Received bytes are interpreted as
 * text and \n delimited lines are considered as records. They are then counted and printed.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999`
 */
object CustomReceiver {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: CustomReceiver <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("CustomReceiver")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create an input stream with the custom receiver on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}


class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
   var socket: Socket = null
   var userInput: String = null
   try {
     logInfo("Connecting to " + host + ":" + port)
     socket = new Socket(host, port)
     logInfo("Connected to " + host + ":" + port)
     val reader = new BufferedReader(
       new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
     userInput = reader.readLine()
     while(!isStopped && userInput != null) {
       store(userInput)
       userInput = reader.readLine()
     }
     reader.close()
     socket.close()
     logInfo("Stopped receiving")
     restart("Trying to connect again")
   } catch {
     case e: java.net.ConnectException =>
       restart("Error connecting to " + host + ":" + port, e)
     case t: Throwable =>
       restart("Error receiving data", t)
   }
  }
}
// scalastyle:on println
接收器的可靠性

基准于receiver的可靠性和默认容错语义大概可以分为朗阁方面

如果你想实现一个可靠的数据接收器,必须用store方法,这是一个阻塞的方法,在它存入spark内部时才会返回值。如果接受器用的存储水平是复制(也可以使用默认),那么在复制完后才会得到返回值。因此,在确定完数据可靠存储之后,再适当的发送确认信息。这就避免了如果在存储中途失败,也不会导致数据丢失。因为缓冲区的数据没有被确认,那么数据源将会重新发送。
如果是不可靠的数据源,那么无须以上逻辑,在接受到数据后即刻调用store方法插入到spark。但他也有如下的好处

Receiver Type Characteristics
Unreliable Receivers Simple to implement.
System takes care of block generation and rate control. No fault-tolerance guarantees, can lose data on receiver failure.
Reliable Receivers Strong fault-tolerance guarantees, can ensure zero data loss.
Block generation and rate control to be handled by the receiver implementation.
Implementation complexity depends on the acknowledgement mechanisms of the source.

上一篇 下一篇

猜你喜欢

热点阅读