FlinkSql读取Map类型数据

2021-07-31  本文已影响0人  毛驴胡子
家楼顶

需求

源kafka消息数据中有个字段是Map类型, 我希望读取该字段并且写入clickhouse中, 这是个Map<String, Object>类型的数据, 还有可能是嵌套结构。就像这样

{
    "name":"hello",
    "info": {
        "age": 18,
        "gender": "male",
        "other": {
            "car": "川A8888888",
            "what": 100
        }
    }
}

问题

flinksql原生支持Map类型, 但是必须制定key和value的类型, 无法满足需求。所以打算以字符串的方式写入clickhouse, 查询的时候再解析, 于是我在sql定义中将map类型的数据类型写成String。就像这样

CREATE TABLE long_long_ago (
    name STRING,
    info STRING
) WITH (…)

可事情没有向预计的方向发展, 程序不报错, 可是写入到clickhouse中的info字段为空, 一脸懵逼, 开始进入正题。

方案

为什么将Map类型的数据定义成String后解析出来的数据是空呢?直接看代码, 不一会儿就定位到所在代码的位置了。

所在包:flink-json
所在类:org.apache.flink.formats.json.JsonRowDataDeserializationSchema
所在方法:convertToString
原因: 
  因为flink解析出来的info本质还是jsonNode, 即使我们在sql中定义其为String。     
  而jsonNode的asText方法是没实现的(也就是空)。
  所以适当的修改一下代码就可以了, jsonNode实现了toString类。

修改如下

private StringData convertToString(JsonNode jsonNode) {
   if (jsonNode.asText() == "") {
      return StringData.fromString(jsonNode.toString());
   } else {
      return StringData.fromString(jsonNode.asText());
   }
}

修改完上面的方法后, 打包flink-json包, 替换jar包。

好了, 现在flink程序就可以将Map对象转成String, 然后落地到clickhouse了

上一篇 下一篇

猜你喜欢

热点阅读