Flink DataSouce

2020-01-05  本文已影响0人  知识海洋中的淡水鱼

flink流处理和批处理都内置了很多数据源,可以满足我们大部分使用场景,当然也可以通过实现flink提供的接口来实现其他数据源的接入。接下来我们就分别来了解下flink批处理和流处理的数据源吧。

1 批处理

批处理的数据源主要大致分为两类:集合数据源、文件数据源。

sequence

集合数据源之迭代类。平时我们所使用到的scala中Seq(),Array(),List(),Map(),Set()等都是迭代类。

    env.fromCollection(data: Iterable[T])
    // 示例
    env.fromCollection(Seq(1, 2, 3))
    env.fromCollection(Array(1, 2, 3, 4, 5))
    env.fromCollection(List(1, 2, 3, 4, 5))
    env.fromCollection(Map("a" -> 1, "b" -> 2, "c" -> 3))
    env.fromCollection(Set(3, 5, 6))

    // 并行版本
    env.fromParallelCollection(iterator: SplittableIterator[T])
    // 示例
    env.fromParallelCollection(new NumberSequenceIterator(3, 19))
01_parallel_collection.png

集合类型源之元素数组。类似于java中的可变参数列表。

    env.fromElements(data: T*)
    // 示例
    env.fromElements(1, 2, 3)
    env.fromElements("a", "b", "c")

file

一般来说,我们的批处理程序的数据源主要都是文件数据源,包括本地文件系统中的和分布式文件系统中的文件。使用频次比较高的是readTextFile()和readCsvFile(),当然也可以通过重写readFlie(new FileInputStream[T], filePath: String)中的FileInputFormat来读取不同格式文件。

    env.readFile(new FileInputStream[T], filePath: String)
    env.readTextFile(filePath: String, charsetName: String = "UTF-8")
    env.readTextFileWithValue(filePath: String, charsetName: String = "UTF-8")
    env.readFileOfPrimitives(filePath: String, delimiter: String = "\n")
    env.readCsvFile(
      filePath: String,                 // csv文件地址
      lineDelimiter: String = "\n",     // 行划分符号
      fieldDelimiter: String = ",",     // 字段划分符号
      quoteCharacter: Character = null,
      ignoreFirstLine: Boolean = false, // 忽略首行
      ignoreComments: String = null,
      lenient: Boolean = false,         // 是否对数据严格判断。false表示严格判断,缺失的数据则会被忽略
      includedFields: Array[Int] = null,
      pojoFields: Array[String] = null
    )

readFile,自定义读取文件内容的逻辑

    env.readFile(new MyFileInputFormat[String](), getPath("words.txt"))

// 重写读取文件的逻辑
class MyFileInputFormat extends FileInputFormat[String]() {
  private var end = false
  private var input: BufferedReader = _

  // 只执行一次,此处创建输入流对象
  override def open(fileSplit: FileInputSplit): Unit = {
    val inputStream = new FileInputStream(this.getFilePath.getPath)
    input = new BufferedReader(new InputStreamReader(inputStream))
  }

  // 判断是否读取到了文件尾部
  override def reachedEnd(): Boolean = this.end

  // 读取下一行的操作逻辑
  override def nextRecord(ot: String): String = {
    val str: String = input.readLine()
    if (str == null) {
      this.end = true
      ""
    } else str
  }
}

2 流处理

flink流处理的数据源大致分为四种:集合数据源、文件数据源、套接字数据源、自定义数据源。flink还原生给我们写好了kafka数据源,这是实时流处理中使用的最频繁没有之一的数据源(下边会单独简单展示一下)。

sequence

不用多说了吧,完全参考批处理中的sequence。

    env.fromCollection(data: Iterable[T])
    env.fromElements(data: T*)
    env.fromParallelCollection(iterator: SplittableIterator[T])

file

一样参考批处理中的flie。只是有一点儿不同的是流处理中readFile()还能监视文件的变更状况来进行更多的处理方式,如文件新增了记录,可以重新处理文件或者直接退出。

    env.readTextFile(filePath: String, charsetName: String)
    env.readFile(
      inputFormat: FileInputFormat[T],
      filePath: String,
      watchType: FileProcessingMode,    // 监视路径并响应新数据,或处理一次并退出
      interval: Long)                   // 间隔时间millis

socket

直接监视指定机子socket端口的记录。

    env.socketTextStream(
      hostname: String, 
      port: Int, 
      delimiter: Char = '\n', 
      maxRetry: Long = 0)               // 如果端口监听中断,最大重试间隔时间

自定义

当上边所有的数据源都满足不了我们的场景需求时,我们可以通过继承flink暴露的SourceFunction来实现自己的数据源(下文会展示自定义MySQL数据源)。

    env.addSource(function: SourceContext[T] => Unit)
    env.addSource(function: SourceFunction[T])

3 kafka connector

kafka connector是flink提供给我们的自定义连接器,可以直接实例化FlinkKafkaConsumer对象来消费kafka中记录。

object FlinkStreamDataSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // kafka properties参数
    val props = new Properties()
    props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
    props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
    props.setProperty("group.id", "leslie")
    props.setProperty("auto.offset.reset", "latest")

    val original: DataStream[String] = env
      .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), props))

    original
      .flatMap(_.split(","))
      .map(_ + "_test")
      .print()

    env.execute("flink_streaming_data_source")
  }
}
01_datasource_kafka.png

4 自定义DataSource

想要实现自定义的数据源十分简单,只需继承flink的SourceFunction接口并重写其中的run(),cancel()方法。

object FlinkStreamCustomerDataSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 自定义dataSource
    env.addSource(new MysqlDataSource).print()

    env.execute("customer_date_source")
  }
}

下文代码MysqlDataSource类是继承了RichSourceFunction类,RichSourceFunction实现SourceFunction接口并同时继承AbstractRichFunction抽象类,AbstractRichFunction抽象类又实现RichFunction接口。为什么我们的自定义数据源要多继承RichSourceFunction类呢?原因就在这个Rich上!我们将这样带有"Rich"前缀的函数类称为富函数,既然是富函数了那么一定是比普通的函数多给我们带来一些功能。RichSourceFunction中的open(),close()就是“富”出来的方法,open()方法仅在函数类实例化的时候调用一次(通常用来建立连接),close()则是在实例对象销毁前调用一次(通常用来关闭连接),可以避免重复进行创建连接销毁连接操作。(当然富函数不仅仅只“富”了这么一点点,还“富”出来运行时上下文,这可是个好东西。此处不扩展哦,以后用到再来讨论)

02_rich_function.png

我们需要进行获取外部存储组件数据的操作就在SourceFunction的run(),cancel()方法中实现。

class MysqlDataSource extends RichSourceFunction[String] {
  private var pStmt: PreparedStatement = _
  private var conn: Connection = _

  // 开始方法,只执行一次,建立和mysql的连接
  override def open(parameters: Configuration): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
    val username = "root"
    val password = "123456"
    conn = DriverManager.getConnection(url, username, password);
    val sql =
      """
        |select id, name, sex, age from user;
        |""".stripMargin
    pStmt = conn.prepareStatement(sql)
  }

  // 结束方法,只执行一次,关闭连接
  override def close(): Unit = {
    // 关闭连接
    if (pStmt != null) pStmt.close()
    if (conn != null) conn.close()
  }

  // 主体执行方法
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val rs: ResultSet = pStmt.executeQuery()
    while (rs.next()) {
      val id: Int = rs.getInt("id")
      val name: String = rs.getString("name")
      val sex: String = rs.getString("sex")
      val age: Int = rs.getInt("age")

      ctx.collect(s"id: $id, name: $name, sex: $sex, age:$age") // 收集记录到上下文中
    }
  }
  override def cancel(): Unit = {}
}
02_customer_mysql.png
上一篇下一篇

猜你喜欢

热点阅读