Flink从Kafka读数据

2019-08-23  本文已影响0人  yayooo

package com.atguigu.apiTest

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object SourceTestKafka {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //从kafka读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","hadoop102:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    //创建一个source
    /**
      * (String topic, DeserializationSchema<T> valueDeserializer, Properties props)
      * (topic ,值的反序列化工具,Properties)
      */
    val stream3: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(),properties))

    stream3.print("stream3: ").setParallelism(1)

    env.execute()
  }
}

启动kafka一个生产者

bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor

输入 sensor_1, 1547718199, 35.80018327300259

思考:保证数据一致性
如果挂掉了怎么保证数据一致性,设置检查点,状态存盘
(场景:在处理一条数据时,又来了一条数据,已经读进来了,如果此时处理的数据出错,回滚到的却是后来的这条数据的偏移量,也就导致了数据的丢失)
偏移量:

  1. 数据回滚后,手动提交修改偏移量
  1. FlinkKafkaConsumer实现了手动修改偏移量
上一篇 下一篇

猜你喜欢

热点阅读