Flink SQL实战演练之自定义Table Format

2021-07-11  本文已影响0人  Coder小咚

简介:接着上次Flink CDC继续聊,提到这块,不得不重点说下canal-json format了,canal json format对json format进行了封装,负责把binlog形式的json数据转化成了Flink能够识别的RowData数据,当然也包括数据的描述信息封装类RowType。笔者想实现根据RowKind进行数据的过滤,目前可以通过修改canal format的源数据来实现,也可以通过将changelog流以changelog json的形式回写Kafka。

前言

基于目前对table format的了解,这里自定义event json format,用来处理事件流数据,因为事件流字段不固定,可能只有少部分字段是固定的,其他字段都是扩展的,所以笔者想实现用户自定义schema指定公共字段,然后其他字段以json的行为存在metadata中的default字段中。

  1. 在table format工厂类中初始化编解码的过程,并且声明所需要的option配置参数
package com.sht.formats.json;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

public class EventJsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {

   public static final String IDENTIFIER = "event-json";

   public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = JsonOptions.IGNORE_PARSE_ERRORS;

   public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;

   @Override
   public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {
       FactoryUtil.validateFactoryOptions(this, formatOptions);
       final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
       TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
       final String others = formatOptions.getOptional(EventJsonOptions.OTHER_FIELD).orElse(null);
       return new EventJsonDecodingFormat(ignoreParseErrors, timestampFormat, others);
   }

   @Override
   public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {
       FactoryUtil.validateFactoryOptions(this, formatOptions);
       TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
       final String others = formatOptions.getOptional(EventJsonOptions.OTHER_FIELD).orElse(null);

       return new EncodingFormat<SerializationSchema<RowData>>() {
           @Override
           public ChangelogMode getChangelogMode() {
               return ChangelogMode.insertOnly();
           }

           @Override
           public SerializationSchema<RowData> createRuntimeEncoder(
               DynamicTableSink.Context context, DataType consumedDataType) {
               final RowType rowType = (RowType) consumedDataType.getLogicalType();
               return new EventJsonSerializationSchema(rowType, timestampFormat, others);
           }
       };
   }

   @Override
   public String factoryIdentifier() {
       return IDENTIFIER;
   }

   @Override
   public Set<ConfigOption<?>> requiredOptions() {
       return Collections.emptySet();
   }

   @Override
   public Set<ConfigOption<?>> optionalOptions() {
       Set<ConfigOption<?>> options = new HashSet<>();
       options.add(IGNORE_PARSE_ERRORS);
       options.add(TIMESTAMP_FORMAT);
       options.add(EventJsonOptions.OTHER_FIELD);
       return options;
   }
}
  1. 编码过程比较简单,直接new抽象方法来实现,解码方法相对比较复杂,单独定义类
package com.sht.formats.json;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class EventJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {

    private List<String> metadataKeys;

    private final String otherField;

    private final boolean ignoreParseErrors;

    private final TimestampFormat timestampFormat;

    public EventJsonDecodingFormat(boolean ignoreParseErrors, TimestampFormat timestampFormat, String otherField) {
        this.ignoreParseErrors = ignoreParseErrors;
        this.timestampFormat = timestampFormat;
        this.otherField = otherField;
        this.metadataKeys = Collections.emptyList();
    }

    @Override
    public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context, DataType physicalDataType) {
        final List<EventJsonDecodingFormat.ReadableMetadata> readableMetadata =
            metadataKeys.stream()
                .map(
                    k ->
                        Stream.of(EventJsonDecodingFormat.ReadableMetadata.values())
                            .filter(rm -> rm.key.equals(k))
                            .findFirst()
                            .orElseThrow(IllegalStateException::new))
                .collect(Collectors.toList());
        final List<DataTypes.Field> metadataFields =
            readableMetadata.stream()
                .map(m -> DataTypes.FIELD(m.key, m.dataType))
                .collect(Collectors.toList());
        final DataType producedDataType =
            DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
        final TypeInformation<RowData> producedTypeInfo =
            context.createTypeInformation(producedDataType);
        return new EventJsonDeserializationSchema(producedDataType, producedTypeInfo, ignoreParseErrors, timestampFormat, otherField);
    }

    @Override
    public Map<String, DataType> listReadableMetadata() {
        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
        Stream.of(EventJsonDecodingFormat.ReadableMetadata.values())
            .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
        return metadataMap;
    }

    @Override
    public void applyReadableMetadata(List<String> metadataKeys) {
        this.metadataKeys = metadataKeys;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    /** List of metadata that can be read with this format. */
    enum ReadableMetadata {
        OTHERS(
            "others",
            DataTypes.STRING().nullable(),
            DataTypes.FIELD("others", DataTypes.STRING()));

        final String key;

        final DataType dataType;

        final DataTypes.Field requiredJsonField;

        ReadableMetadata(
            String key,
            DataType dataType,
            DataTypes.Field requiredJsonField) {
            this.key = key;
            this.dataType = dataType;
            this.requiredJsonField = requiredJsonField;
        }
    }
}
  1. Option配置类
package com.sht.formats.json;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.formats.json.JsonOptions;

/** Option utils for event-json format. */
public class EventJsonOptions {

    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = JsonOptions.IGNORE_PARSE_ERRORS;

    public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;

    public static final ConfigOption<String> OTHER_FIELD =
            ConfigOptions.key("others")
                .stringType()
                    .defaultValue("others")
                        .withDescription("扩展字段以json的方式存入该字段");
}
  1. 序列化类,将RowData序列化为字节数组,用于输出到外部连接
package com.sht.formats.json;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import java.util.List;

public class EventJsonSerializationSchema implements SerializationSchema<RowData> {

    private List<RowType.RowField> rowTypeFields;
    private String othersField;

    public EventJsonSerializationSchema(RowType rowType, TimestampFormat timestampFormat, String otherField) {
        this.rowTypeFields = rowType.getFields();
        this.othersField = otherField;
    }

    @Override
    public byte[] serialize(RowData rowData) {
        JSONObject jsonObject = new JSONObject();
        for (int i=0; i< rowTypeFields.size(); i++) {
            final RowType.RowField rowField = rowTypeFields.get(i);
            final String rowLine = rowData.getString(i).toString();
            if (othersField.equals(rowField.getName())) {
                jsonObject.putAll(JSONObject.parseObject(rowLine));
            } else {
                jsonObject.put(rowField.getName(), rowLine);
            }
        }
        return jsonObject.toJSONString().getBytes();
    }
}
  1. 反序列化类,用于将外部数据流转化为flink sql可是识别的RowData数据结构
package com.sht.formats.json;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class EventJsonDeserializationSchema implements DeserializationSchema<RowData> {

    private TypeInformation<RowData> resultTypeInfo;
    private List<RowType.RowField> rowTypeFields;
    private String otherField;

    public EventJsonDeserializationSchema(DataType dataType, TypeInformation<RowData> resultTypeInfo, boolean ignoreParseErrors,
                                          TimestampFormat timestampFormatOption, String otherField) {
        this.resultTypeInfo = resultTypeInfo;
        final RowType rowType = (RowType) dataType.getLogicalType();
        this.rowTypeFields = rowType.getFields();
        this.otherField = otherField;
    }

    @Override
    public RowData deserialize(byte[] bytes) throws IOException {
        throw new RuntimeException(
            "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
    }

    @Override
    public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
        if (message == null || message.length == 0) {
            return;
        }
        String line = new String(message);

        final JSONObject jsonObject = JSONObject.parseObject(line);
        GenericRowData rowData = new GenericRowData(rowTypeFields.size() );
        JSONObject others = new JSONObject();
        List<String> existField = new ArrayList<>();
        for (int i=0; i<rowTypeFields.size(); i++) {
            final RowType.RowField rowField = rowTypeFields.get(i);
            if (jsonObject.containsKey(rowField.getName())) {
                existField.add(rowField.getName());
                rowData.setField(i, new BinaryStringData(jsonObject.getString(rowField.getName())));
            }
        }
        for (String key : jsonObject.keySet()) {
            if (!existField.contains(key)) {
                others.put(key, jsonObject.get(key));
            }
        }
        rowData.setField(rowTypeFields.size() - 1, new BinaryStringData(others.toJSONString()));
        out.collect(rowData);
    }

    @Override
    public boolean isEndOfStream(RowData rowData) {
        return false;
    }

    @Override
    public TypeInformation<RowData> getProducedType() {
        return resultTypeInfo;
    }
}
  1. SPI接口配置。在resources源目录中创建META-INF/services两层文件夹,然后创建org.apache.flink.table.factories.Factory配置文件,内容如下:
com.sht.formats.json.EventJsonFormatFactory
  1. 测试用例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class EventJsonFormatTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        tableEnvironment.executeSql(" " +
            " CREATE TABLE sourceTable ( " +
            "  others STRING METADATA FROM 'value.others', " +
            "  key STRING, " +
            "  uid STRING " +
            " ) WITH ( " +
            "  'connector' = 'kafka', " +
            "  'topic' = 'event', " +
            "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
            "  'properties.enable.auto.commit' = 'false', " +
            "  'properties.session.timeout.ms' = '90000', " +
            "  'properties.request.timeout.ms' = '325000', " +
            "  'scan.startup.mode' = 'earliest-offset' , " +
            "  'value.format' = 'event-json', " +
            "  'value.event-json.others' = 'others' " +
            " ) "
        );

        tableEnvironment.executeSql(" " +
            " CREATE TABLE sinkTable ( " +
            "  others STRING, " +
            "  key STRING, " +
            "  uid STRING " +
            " ) WITH ( " +
            "  'connector' = 'kafka', " +
            "  'topic' = 'dwd_event', " +
            "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
            "  'properties.enable.auto.commit' = 'false', " +
            "  'properties.session.timeout.ms' = '90000', " +
            "  'properties.request.timeout.ms' = '325000', " +
            "  'value.format' = 'event-json', " +
            "  'value.event-json.others' = 'others', " +
            "  'sink.partitioner' = 'round-robin', " +
            "  'sink.parallelism' = '4' " +
            " ) "
        );

//        tableEnvironment.executeSql("select * from sourceTable");

        tableEnvironment.executeSql("insert into sinkTable(key, uid, others) select key, uid, others from sourceTable");
    }
}

Table Format作为Connector组件单独用于序列化和反序列化内部数据的模块而单独存在,多个Connector可以公用。自定义Table Format可以让大家更好的理解Flink SQL时如何将外部数据转化为内部可以识别的RowData数据结构的,从而在排查问题的时候能准确定位到具体位置。

上一篇 下一篇

猜你喜欢

热点阅读