Flink流处理API

2020-06-14  本文已影响0人  安申

1.Flink的三大处理过程

Flink处理过程

2.Environment

1)getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

// 批处理环境

val env = ExecutionEnvironment.getExecutionEnvironment

// 流式数据处理环境

val env = StreamExecutionEnvironment.getExecutionEnvironment

2)createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

3)createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

val env =ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",6123,"YOURPATH//wordcount.jar")

3.Source

1)从集合中读取数据

 val sensorDS:DataStream[WaterSensor] = env.fromCollection(

            List(

               WaterSensor("ws_001", 1577844001, 45.0),

               WaterSensor("ws_002", 1577844015, 43.0),

               WaterSensor("ws_003", 1577844020, 42.0)

            )

        )

2)从文件中读取数据

val fileDS: DataStream[String] = env.readTextFile("input/data.txt")

3)以kafka消息队列的数据作为来源

(1)引入kafka连接器的依赖:

<dependency>

   <groupId>org.apache.flink</groupId>

  <artifactId> flink-connector-kafka-0.11_2.11</artifactId>

   <version>1.7.2</version>

</dependency>

(2)代码实现:

val kafkaDS: DataStream[String] = env.addSource(

newFlinkKafkaConsumer011[String]("sensor",

new SimpleStringSchema(), properties))

4)自定义Source

3.Transform:相当于Spark中的算子

map

flatMap

filter

keyBy

4.Sink

Flink没有类似于spark中foreach方法,让用户进行迭代的操作。所有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作

stream.addSink(new MySink(xxxx))

Flink支持的一些主流的Sink
上一篇下一篇

猜你喜欢

热点阅读