实时计算框架Flink

Flink8:Flink流处理Api之Source

2020-05-02  本文已影响0人  勇于自信

流处理基本步骤:


Source

1. 从集合读取数据

package wordcount

import org.apache.flink.streaming.api.scala._

// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object Sensor {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 1. 从集合中读取数据
    val stream1 = env.fromCollection(List(
      SensorReading("sensor_1", 1547718199, 35.80018327300259),
      SensorReading("sensor_6", 1547718201, 15.402984393403084),
      SensorReading("sensor_7", 1547718202, 6.720945201171228),
      SensorReading("sensor_10", 1547718205, 38.101067604893444)
    ))
    stream1.print("stream1:").setParallelism(1)
    env.execute()
  }
}

运行代码,打印结果:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
stream1:> SensorReading(sensor_1,1547718199,35.80018327300259)
stream1:> SensorReading(sensor_6,1547718201,15.402984393403084)
stream1:> SensorReading(sensor_7,1547718202,6.720945201171228)
stream1:> SensorReading(sensor_10,1547718205,38.101067604893444)

Process finished with exit code 0

2. 从文件读取数据

val stream2 = env.readTextFile("YOUR_FILE_PATH")

3. 以kafka消息队列的数据作为来源

  1. 首先,pom配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
<!--                <version>3.0.0</version>-->
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. 集成开发代码:
package com.stream;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class KafkaStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "localhost:9092")
        prop.setProperty("group.id", "consumer-group")
        prop.setProperty("key.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer")
        prop.setProperty("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer")
        prop.setProperty("auto.offset.reset", "latest")

        DataStreamSource dstream = environment.addSource(new FlinkKafkaConsumer011("senser", new SimpleStringSchema(), prop));
        dstream.print("kafka test").setParallelism(1);
        environment.execute();
    }
}

  1. 运行,上面开发是重点,因为演示环境麻烦,这里只给出操作步骤:
    3.1 在虚拟机启动zookeeper和kafka服务
    3.2 在虚拟机启动代码里响应配置的topic的producer
    3.3 进入flink主目录下执行./bin/start-cluster.sh启动flink
    3.4 assembly方式打包上传jar包到虚拟机执行:
    ./flink run -c com.stream.KafkaStream flink-study-1.0-SNAPSHOT-jar-with-dependencies.jar
    3.5 在producer输入数据,控制台没有结果,可以访问<u>h</u><u>ttp://hadoop1:8081</u>
    在TaskManager上看输出日志。

  2. Flink+kafka是如何实现exactly-once语义的:
    Flink通过checkpoint来保存数据是否处理完成的状态:
    由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
    执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
    如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。


4. 自定义Source
除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个SourceFunction就可以。具体调用如下:

package wordcount

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object SourceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //自定义source
    val stream = env.addSource(new MySensorSource())
    stream.print("stream").setParallelism(1)
    env.execute("source test")
  }
}

我们希望可以随机生成传感器数据,MySensorSource具体的代码实现如下:

package wordcount

import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.util.Random

class MySensorSource extends SourceFunction[SensorReading]{
  //flag:表示数据源是否还在正常运行
  var runing: Boolean = true

  override def cancel(): Unit = {
    runing=false
  }

  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    //初始化一个随机数发生器
    val rand = new Random()
    //初始化定义一组传感器温度数据
    var curTemp=1.to(10).map(
      i=>("sensor_"+i,65+rand.nextGaussian()*20)
    )
    while(runing){
      //在前一次温度的基础上更新温度值
      curTemp = curTemp.map(
        t=>(t._1,t._2*rand.nextGaussian())
      )

      //获取当前时间戳
      val curTime = System.currentTimeMillis();
      curTemp.foreach(
        t=>ctx.collect(SensorReading(t._1,curTime,t._2))
      )
      //设置时间间隔
      Thread.sleep(500)
    }
  }
}

运行调用代码,结果控制台一直在输出不断随机生成的数据,如下:


上一篇下一篇

猜你喜欢

热点阅读