Flink_StreamingFileSink-实时数据写入HD
Flink DataStream中将流数据保存HDFS文件系统方式:
-
第一种方式:自定义Sink,实现RichSinkFunction
- 使用JDBC的方式将数据写入到Hive数据库中,这种方式效率比较低
- 原因在于:INSERT INTO 插入数据,底层运行MapReduce程序,所以不推荐使用,了解即可。
-
第二种方式:
StreamingFileSink
Connector- 流式写入HDFS文件,吞吐量较高
StreamingFileSink 实现
数据落地HDFS,使用Flink DataStream中自带Connector:StreamingFileSink
,将分区文件写入到支持 [Flink FileSystem]
(https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/filesystems/index.html) 接口的文件系统中。
- 官方文档
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/connectors/streamfile_sink.html -
使用说明
image.png
概念
1.StreamingFileSink 会将数据写入Bucket桶中(可以理解为Hive的分区目录dt=yyyy-DD-mm)
2.每个子任务写入数据的时候,都有一个单独的文件.
3.写入的文件有三种状态
- finshed 可读取的文件
- pending-flie 写入完成的文件,但不能读取
- in-process 正在写入文件
设置
创建StreamingFlieSink实例对象有意向四个设置
-
数据存储格式
row行 bulk(列示)(avro,orc Parquet ) -
分桶策略(十分重要)
分区目录名称, -
文件滚动策略
按照文件大小和超时时间滚动文件 -
文件名称
文件前缀 文件名后缀
重要
- 在使用StreamingFileSink 必须开启 Chckpoint
- 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
面试题
StreamingFileSink怎样实现写入Hdfs的精确性一次语义?
通过subtask写入的文件状态实现的,写入的文件有三种状态:可读取,写入完成,正在写入.
写入的文件有三种状态:in-process、in-pending、finshed,invoke方法里面正在写入的文件状态是in-process,当满足滚动策略之后将文件变为in-pending状态,
滚动策略
滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。
自定义分桶
自定义StreamingFileSink中桶分配器策略,实现与Hive中分区路径一致:dt=2022-08-30
todo: 定义子类,实现接口,重写方法,创建对象
BucketAssigner<IN, BucketID> 中泛型参数含义:
1) 第1个泛型参数:IN
The type of input elements,表示数据流中每条数据类型
2) 第2个泛型参数:BucketID
表示桶名称,直接返回String字符串即可,表示数据流中每条数据写入目录
/**
* 自定义StreamingFileSink中桶分配器策略,实现与Hive中分区路径一致:dt=2022-08-30
* todo: 定义子类,实现接口,重写方法,创建对象
* BucketAssigner<IN, BucketID> 中泛型参数含义:
* 1) 第1个泛型参数:IN
* The type of input elements,表示数据流中每条数据类型
* 2) 第2个泛型参数:BucketID
* 表示桶名称,直接返回String字符串即可,表示数据流中每条数据写入目录
*/
private static class HivePartitionBucketAssigner implements BucketAssigner<String, String> {
// 表的名称
@Override
public String getBucketId(String element, Context context) {
// a. 获取当前日期
String currentDate = DateUtil.getCurrentDate();
// b. 拼凑字符串: 桶id
String bucketId = "dt=" + currentDate;
// c. 直接返回桶id
return bucketId;
}
// 序列化类 参考默认的
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
copy代码
public static StreamingFileSink<String> getFileSink(String tableName ){
//1. 文件写入路径
String outputPath = ConfigLoader.get("hdfsUri") + "/user/hive/warehouse/vehicle_ods.db/" + tableName;
//2. 创建StreamingFileSink对象
StreamingFileSink<String> fileSink = StreamingFileSink
// 4-1. 设置存储文件格式,Row一行一行数据存储
.<String>forRowFormat(
new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")
)
// 4-2. 设置桶分配策略,存储目录名称,默认基于事件分配器
.withBucketAssigner(
// new DateTimeBucketAssigner<String>("yyyyDDmm")
new HivePartitionBucketAssigner()
)
// 4-3. 设置数据文件滚动策略,如何产生新文件
.withRollingPolicy(
DefaultRollingPolicy.builder()
// 时间间隔 2分钟写入一次,如果1分钟没有数据写入 就自动写入
.withRolloverInterval(TimeUnit.MINUTES.toMillis(10))
// 多久不写入数据时间间隔
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
// 文件大小
.withMaxPartSize(128 * 1024 * 1024)
.build()
)
// 4-4. 设置文件名称 车辆数据
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("vehicle")
.withPartSuffix(".data")
.build()
)
.build();
return fileSink;
}