眼君的大数据之路

Flink的API

2021-07-22  本文已影响0人  眼君

Source API

以下scala代码展示了几种source类型:

package com.example.apitest
import org.apache.flink.streaming.api.scala._

//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object sourceTest {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //1.从集合中读取数据
    val dataList = List(
      SensorReading("sensor_1",1547718199,35.8),
      SensorReading("sensor_2",1547718201,15.8),
      SensorReading("sensor_3",1547718221,25.3)
    )
    val stream1 = env.fromCollection(dataList)

    //2.自定义数据source
    val stream2 = env.fromElements(1.0,35,"hello")

    //3.从文件中读取数据
    val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
    val stream3 = env.readTextFile(inputPath)

    stream3.print()
    //执行
    env.execute("source test")
  }
}

flink从kafka获取源数据

首先pow文件需要添加如下依赖:

<dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
          <version>1.10.1</version>
</dependency>

以下代码为获取kafka数据源流计算代码:

package com.example.apitest
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import java.util.Properties

//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object sourceTest {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","192.168.2.144:9092")
    //properties.setProperty("group.id","consumer-group")
    val stream = env.addSource(new FlinkKafkaConsumer011[String]("firstkafka",new SimpleStringSchema(),properties))

    stream.print()
    //执行
    env.execute("source test")
  }
}

上述代码执行完以后,我们需要开启对应的kafka的对应topic的生产者,产生数据。

./kafka-console-producer.sh --broker-list 192.168.2.144:9092 --topic firstkafka

Sink API

写到文件系统

package com.example.apitest.sinktest
import com.example.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._


object FileSink {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    // 转换成样例类
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })

    //写到文件系统
    dataStream.print()
    dataStream.addSink(
      StreamingFileSink.forRowFormat(
        new Path("/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/out.txt"),
        new SimpleStringEncoder[SensorReading]()
      ).build()
    )
    env.execute("file sink test")
  }
}

写到Kafka

package com.example.apitest.sinktest

import com.example.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011


object KafkaSinkTest {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    // 转换成样例类
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0),arr(1).toLong,arr(2).toDouble).toString
      })

    //写到文件系统
    dataStream.print()
    dataStream.addSink(
      new FlinkKafkaProducer011[String]("192.168.2.144:9092","firstkafka",new SimpleStringSchema())
    )
    env.execute("Kafka sink test")

  }
}

写到MySQL

上一篇下一篇

猜你喜欢

热点阅读