flink的基本概念和DataStream API使用
flink是一个高性能的流处理框架。
flink架构
flink架构上分为Deploy层、Core层、Api层以及Libraries层
Deploy层主要是部署模式,包括本地、集群、云。
Core层主要为flink流处理引擎,提供了flink计算的实现。
Api层主要分为DataStram API和DataSet API,分别提供了流处理和批处理。
Libraries层主要为流处理和批处理提供了一些应用库,也包括流处理基于Table的操作和SQL支持。
DataStream的使用
//首先需要获取运行环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
/**
*中间用于处理数据
*
*source获取数据源
*transformation数据处理
*sink数据输出
*
*/
//执行程序
environment.execute("use DataStream")
选择source
//获取集合
val text = environment.fromCollection(Array(1, 2, 3, 4, 5))
//获取socket
val text = environment.socketTextStream("localhost", 9001)
//获取文件
val text = environment.readTextFile(path)
//获取kafka
val topic = "kafka_topic"
val prop = new Properties()
prop.setProperty("bootstrap.servers","localhost:9092")
prop.setProperty("group.id","conumser_group_1")
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
选择sink
//写入kafka
val topic = "kafka_topic"
val prop = new Properties()
prop.setProperty("bootstrap.servers","localhost:9092")
val kafkaProducer = new FlinkKafkaProducer[String](topic, new KafkaSerializationSchemaWrapper[String](topic, null, false, new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
text.addSink(kafkaProducer)
Transformation的流程
1.环境通过获取数据变为DataStream类型
2.DataStream然后通过Transformation实现各种需求后可以最终转换为新的DataStream
3.DataStream输出后返回一个DataStreamSink类型
DataStream类型的转换与基本使用:
DataStream直接转换为DataStream:
map:一个元素生成一个新的元素
flatMap:一个元素生成多个元素
filter:过滤元素
多个DataStream合并为一个DataStream:
union:合并多个数据类型相同的流
两个DataStream转换为ConnectedStreams:
connect:可以合并数据不相同的流,最后通过ConnectedStreams的map方法转换回DataStream
DataStream转换为KeyedStream:
keyBy:将一个流根据key转换划分到不同分区
KeyedStream转换为DataStream:
reduce:分别对key值进行求和
除此之外还有sum(),min(),max()等一些聚合函数
DataStream与DataStream转换为其他的类型最终转换为DataStream
join:进行匹配后只显示匹配上的数据
先后调用join、where、equalTo、window、apply
coGroup:进行匹配后会显示所有数据
先后调用coGroup、where、equalTo、window、apply
此外还有WindowedStream、BroadcastConnectedStream、AllWindowedStream、SplitStream等后续再补充......