flink学习

Flink1.10迁移到flink1.11的“坑”(变化)

2020-08-06  本文已影响0人  FishMAN__

第一坑:使用Stream 转table api 中的fromDataStream的java和scala写法(需要引入flink的$符号)

1.下面是官网的scala和java写法


官网的scala写法.png 官网的java写法.png

2、我引入了import org.apache.flink.table.api.Expressions.$ ,上面是1.10写法,使用官网1.11版本的scala编程还是报错:


采用官网scala写法报错

3、最终scala的编程引用java的方式可以了!

import org.apache.flink.table.api.Expressions.$ 
val dspbidTable: Table = tEnv.fromDataStream(dspbidStream, $("bversionv"), $("bplanid"), $("clickPrice"), $("impressPrice"),
      $("planCostType"), $("sspSettle"), $("bgid"), $("sspUid"), $("ip"), $("ua"), $("androidid"),
      $("imei"), $("budid"), $("bvid"), $("aps"), $("btimestamp").rowtime())

第二坑:

1、table创建视图方式

过期:
tEnv.registerTable("dspClick", dspClickTable)
现在:
tEnv.createTemporaryView("dspClick", dspClickTable)

2、sql转Stream的方式

过期:
tEnv.registerTable("TemporalJoin", tEnv.sqlQuery(sqlJoin))
val joinSqlStream: DataStream[Row] = tEnv.scan("TemporalJoin").toAppendStream[Row]

现在:(更简洁)  
val table: Table = tEnv.sqlQuery(sqlJoin)
val joinSqlStream: DataStream[Row] = tEnv.toAppendStream[Row](table)
上一篇下一篇

猜你喜欢

热点阅读