Flink_StreamingFileSink-实时数据写入HD

2022-08-31  本文已影响0人  Eqo

Flink DataStream中将流数据保存HDFS文件系统方式:

StreamingFileSink 实现

数据落地HDFS,使用Flink DataStream中自带Connector:StreamingFileSink,将分区文件写入到支持 [Flink FileSystem]
(https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/filesystems/index.html) 接口的文件系统中。

概念
1.StreamingFileSink 会将数据写入Bucket桶中(可以理解为Hive的分区目录dt=yyyy-DD-mm)
2.每个子任务写入数据的时候,都有一个单独的文件.
3.写入的文件有三种状态
- finshed 可读取的文件
- pending-flie 写入完成的文件,但不能读取
- in-process 正在写入文件

设置
创建StreamingFlieSink实例对象有意向四个设置

重要

面试题

StreamingFileSink怎样实现写入Hdfs的精确性一次语义?
通过subtask写入的文件状态实现的,写入的文件有三种状态:可读取,写入完成,正在写入.
写入的文件有三种状态:in-process、in-pending、finshed,invoke方法里面正在写入的文件状态是in-process,当满足滚动策略之后将文件变为in-pending状态,

滚动策略

滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

自定义分桶

自定义StreamingFileSink中桶分配器策略,实现与Hive中分区路径一致:dt=2022-08-30
     todo: 定义子类,实现接口,重写方法,创建对象
     BucketAssigner<IN, BucketID> 中泛型参数含义:
     1) 第1个泛型参数:IN
         The type of input elements,表示数据流中每条数据类型
     2) 第2个泛型参数:BucketID
         表示桶名称,直接返回String字符串即可,表示数据流中每条数据写入目录
    /**
     * 自定义StreamingFileSink中桶分配器策略,实现与Hive中分区路径一致:dt=2022-08-30
     *      todo: 定义子类,实现接口,重写方法,创建对象
     *      BucketAssigner<IN, BucketID> 中泛型参数含义:
     *      1) 第1个泛型参数:IN
     *          The type of input elements,表示数据流中每条数据类型
     *      2) 第2个泛型参数:BucketID
     *          表示桶名称,直接返回String字符串即可,表示数据流中每条数据写入目录
     */
    private static class HivePartitionBucketAssigner implements BucketAssigner<String, String> {


        // 表的名称
        @Override
        public String getBucketId(String element, Context context) {
            // a. 获取当前日期
            String currentDate = DateUtil.getCurrentDate();
            // b. 拼凑字符串: 桶id
            String bucketId = "dt=" + currentDate;
            // c. 直接返回桶id
            return bucketId;
        }
        // 序列化类 参考默认的
        @Override
        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }

    }

copy代码

 public static StreamingFileSink<String> getFileSink(String tableName ){
        //1. 文件写入路径
        String outputPath = ConfigLoader.get("hdfsUri") + "/user/hive/warehouse/vehicle_ods.db/" + tableName;

        //2. 创建StreamingFileSink对象
        StreamingFileSink<String> fileSink = StreamingFileSink
                // 4-1. 设置存储文件格式,Row一行一行数据存储
                .<String>forRowFormat(
                        new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")
                )
                // 4-2. 设置桶分配策略,存储目录名称,默认基于事件分配器
                .withBucketAssigner(
//                        new DateTimeBucketAssigner<String>("yyyyDDmm")
                       new HivePartitionBucketAssigner()
                )

                // 4-3. 设置数据文件滚动策略,如何产生新文件
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                // 时间间隔 2分钟写入一次,如果1分钟没有数据写入 就自动写入
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(10))
                                // 多久不写入数据时间间隔
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
                                // 文件大小
                                .withMaxPartSize(128 * 1024 * 1024)
                                .build()
                )
                // 4-4. 设置文件名称  车辆数据
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("vehicle")
                                .withPartSuffix(".data")
                                .build()
                )
                .build();

        return fileSink;
    }

上一篇下一篇

猜你喜欢

热点阅读