Flink流处理API
1.Flink的三大处理过程
Flink处理过程2.Environment
1)getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
// 批处理环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 流式数据处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
2)createLocalEnvironment
返回本地执行环境,需要在调用时指定默认的并行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
3)createRemoteEnvironment
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
val env =ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",6123,"YOURPATH//wordcount.jar")
3.Source
1)从集合中读取数据
val sensorDS:DataStream[WaterSensor] = env.fromCollection(
List(
WaterSensor("ws_001", 1577844001, 45.0),
WaterSensor("ws_002", 1577844015, 43.0),
WaterSensor("ws_003", 1577844020, 42.0)
)
)
2)从文件中读取数据
val fileDS: DataStream[String] = env.readTextFile("input/data.txt")
3)以kafka消息队列的数据作为来源
(1)引入kafka连接器的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId> flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
(2)代码实现:
val kafkaDS: DataStream[String] = env.addSource(
newFlinkKafkaConsumer011[String]("sensor",
new SimpleStringSchema(), properties))
4)自定义Source
3.Transform:相当于Spark中的算子
map
flatMap
filter
keyBy
4.Sink
Flink没有类似于spark中foreach方法,让用户进行迭代的操作。所有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作
stream.addSink(new MySink(xxxx))
Flink支持的一些主流的Sink