2022-06-11-Flink-48(七)
1. 基本Table API
引入依赖
<!-- flink-table桥接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-如果我们希望在IDEA中允许table则要引入如下依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
说明: 如果是生产环境,lib 目录下默认已经有了 planner,就只需要有 bridge 就可以了。当然,如果想使用用户自定义函数,或是跟 kafka 做连接,需要有一个 SQL client,这个包含在 flink-table-common 里
表创建和查询
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class tableDemo2 {
public static void main(String[] args) {
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
//不通过流环境创建table环境
//基于blink版本planner进行流处理
TableEnvironment tableEnv = TableEnvironment.create(setting);
/* 小米,./click,10000
小红,./click,10000
小牛,./click,10000
小黄,./click,10000*/
//创建一张输入的表
String createDDL = "CREATE TABLE clickTable (" +
" user_name STRING, "+
" url STRING, " +
" ts BIGINT " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
//table-api查询
Table table1 = tableEnv.from("clickTable").where($("user_name").isEqual("小米")).select($("user_name"), $("url"));
tableEnv.createTemporaryView("table1",table1);
//创建一张输出的表
String createOutDDL = "CREATE TABLE outTable (" +
" user_name STRING, "+
" url STRING " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'output/2022-06-11', " +
" 'format' = 'csv' " +
")";
//执行SQL查询
Table table2 = tableEnv.sqlQuery("select user_name , url from clickTable");
Table table_view = tableEnv.sqlQuery("select user_name , url from table1");
tableEnv.executeSql(createOutDDL);
table_view.executeInsert("outTable");
table2.executeInsert("outTable");
}
}
查询有两种:一个是直接table-api查,另一种是SQL查询
bug:Exception in thread “main“ org.apache.flink.table.api.SqlParserException: SQL parse failed. Encounte
解决:user是关键字,或其他关键字问题,使用``框起来
//print打印
String createOutDDL = "CREATE TABLE outTable (" +
" user_name STRING, "+
" url STRING " +
") WITH (" +
" 'connector' = 'print' " +
")";
表转换成流
更新日志流
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class tableDemo3 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//创建一张输入的表
String createDDL = "CREATE TABLE clickTable (" +
" user_name STRING, "+
" url STRING, " +
" ts BIGINT " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
//注册表
tableEnv.executeSql(createDDL);
Table table = tableEnv.sqlQuery("select user_name , count(1) from clickTable where user_name = '小米' group by user_name");
tableEnv.toChangelogStream(table).print("agg");
env.execute();
}
}
流转换成表
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class tableDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
SingleOutputStreamOperator<Event> streamOperator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
//1. 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2. 将stream转换成table
Table table = tableEnv.fromDataStream(streamOperator);
//写SQL转换
//这里的table不是表而是一个对象,实现流对象和表之间转换的媒介是虚拟表
Table table1 = tableEnv.sqlQuery("select user , url , `timestamp` from " + table);
//基于Table直接转换
Table table2 = table.select($("user"), $("url"), $("timestamp")).where($("url").isEqual("./click"));
tableEnv.toDataStream(table1).print("table1");
tableEnv.toDataStream(table2).print("table2");
env.execute();
}
}
2. 流处理中的特殊概念
流处理和关系代数(表,及 SQL)的区别
区别动态表(Dynamic Tables)
动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)。连续查询永远不会终止,并会生成另一个动态表。查询(Query)会不断更新其动态结果表,以反映其动态输入表上的更改。
流式持续查询的过程
流式持续查询过程- 流被转换为动态表
- 对动态表计算连续查询,生成新的动态表
- 生成的动态表被转换回流
动态表转换成流
与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改。将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink 的 Table API 和 SQL 支持三种方式对动态表的更改进行编码:
- 仅追加(Append-only)流
只有插入操作,不涉及数据的Upsert - 撤回(Retract)流
Retract 流是包含两类消息的流,添加(Add)消息和撤回(Retract)消息 - Upsert(更新插入)流
要有唯一的key,撤回(Retract)流:UPDATE 编码为被更改行(前一行)的 retract 消息和更新后行(新行)的 add 消息
3. 时间和窗口
事件时间
public class tableDemo4 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//在DDL中定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" user_name STRING, "+
" url STRING, " +
" ts BIGINT, " +
" et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
" WATERMARK FOR et as et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
//流转换表中定义时间属性
SingleOutputStreamOperator<Event> streamOperator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
Table table = tableEnv.fromDataStream(streamOperator, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());
table.printSchema();
env.execute();
}
}
处理时间
proctime(计算列)
public class tableDemo4 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//在DDL中定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" user_name STRING, "+
" url STRING, " +
" ts BIGINT, " +
" et as PROCTIME() " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
tableEnv.from("clickTable").printSchema();
//流转换表中定义时间属性
DataStreamSource<Event> streamOperator = env.addSource(new clickSource());
Table table = tableEnv.fromDataStream(streamOperator, $("user"), $("url"), $("timestamp").as("ts"), $("et").proctime());
table.printSchema();
}
}
我蒙了???处理时间语义要用水位线吗,没必要把!!!
窗口(window)
4. 聚合查询
分组聚合(老版本)
public class tableDemo5 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
String ddlTable = "CREATE TABLE clickTable (" +
" user_name STRING, "+
" url STRING, " +
" ts BIGINT, " +
" et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
" WATERMARK FOR et as et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(ddlTable);
Table table1 = tableEnv.sqlQuery("select user_name, count(1) from clickTable group by user_name");
Table table2 = tableEnv.sqlQuery("select user_name, count(1), TUMBLE_END(et, INTERVAL '10' SECOND ) from clickTable group by user_name , TUMBLE(et, INTERVAL '10' SECOND )");
tableEnv.toChangelogStream(table1).print("agg: ");
tableEnv.toDataStream(table2).print("window: ");
env.execute();
}
}
窗口聚合
使用窗口TVF实现
public class tableDemo6 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
String ddlTable = "CREATE TABLE clickTable (" +
" user_name STRING, "+
" url STRING, " +
" ts BIGINT, " +
" et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
" WATERMARK FOR et as et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/d.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(ddlTable);
//滚动窗口,含头不含尾
Table table3 = tableEnv.sqlQuery("select user_name, count(1),window_end from TABLE(TUMBLE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '10' SECOND )) group by user_name ,window_start ,window_end");
//滑动窗口
Table table4 = tableEnv.sqlQuery("select user_name, count(1),window_start, window_end from TABLE(HOP(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '5' SECOND ,INTERVAL '10' SECOND )) group by user_name ,window_start ,window_end");
//累计窗口与滑动窗口的区别
Table table5 = tableEnv.sqlQuery("select user_name, count(1),window_start, window_end from TABLE(CUMULATE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '5' SECOND ,INTERVAL '10' SECOND )) group by user_name ,window_start ,window_end");
//tableEnv.toDataStream(table3).print("TUMBLE: ");
tableEnv.toDataStream(table4).print("HOP: ");
tableEnv.toDataStream(table5).print("CUMULATE: ");
env.execute();
}
}
理解累计窗口
小米,./click,1000
小米,./click,2000
小米,./click,5000
小米,./click,6000
小米,./click,9000
小米,./click,10000
小米,./click,10001
小米,./click,14001
小米,./click,15001
小米,./click,19001
HOP: > +I[小米, 2, 1970-01-01T07:59:55, 1970-01-01T08:00:05]
CUMULATE: > +I[小米, 2, 1970-01-01T08:00, 1970-01-01T08:00:05]
HOP: > +I[小米, 5, 1970-01-01T08:00, 1970-01-01T08:00:10]
CUMULATE: > +I[小米, 5, 1970-01-01T08:00, 1970-01-01T08:00:10]
CUMULATE: > +I[小米, 3, 1970-01-01T08:00:10, 1970-01-01T08:00:15]
CUMULATE: > +I[小米, 5, 1970-01-01T08:00:10, 1970-01-01T08:00:20]
HOP: > +I[小米, 6, 1970-01-01T08:00:05, 1970-01-01T08:00:15]
HOP: > +I[小米, 5, 1970-01-01T08:00:10, 1970-01-01T08:00:20]
HOP: > +I[小米, 2, 1970-01-01T08:00:15, 1970-01-01T08:00:25]
开窗函数
解决: OVER windows‘ ordering in stream mode must be defined on a time attribute
//over 窗口
Table table6 = tableEnv.sqlQuery("select user_name, COUNT(1) OVER w as cnt from clickTable WINDOW w as (PARTITION BY user_name order by et rows between unbounded preceding and current row) ");
应用实例TOP_N案例
普通TOP_N
在执行过程中,Flink SQL会对输入的数据流根据排序键进行排序。如果某个分区的前N条记录发生了改变,则会将改变的那几条数据以更新流的形式发给下游
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
public class tableDemo1 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String ddlTable = "CREATE TABLE clickTable (" +
" user_name STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
" WATERMARK FOR et as et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(ddlTable);
Table query = tableEnv.sqlQuery("select user_name, rn , cnt from " +
"(select user_name,cnt, row_number() OVER(PARTITION BY user_name ORDER BY cnt DESC ) as rn from " +
"(select user_name , count(1) as cnt from clickTable group by user_name) ) " +
"where rn <= 2 ");
tableEnv.toChangelogStream(query).print();
env.execute();
}
}
窗口TOP_N
public class tableDemo2 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String ddlTable = "CREATE TABLE clickTable (" +
" user_name STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
" WATERMARK FOR et as et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(ddlTable);
String sqlQuery = "select user_name , count(1) as cnt , window_start,window_end from TABLE(TUMBLE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '10' SECOND)) group by user_name,window_start,window_end";
Table query = tableEnv.sqlQuery("select user_name, rn , cnt,window_start,window_end from " +
"(select user_name,cnt,window_start,window_end, row_number() OVER(PARTITION BY window_start,window_end ORDER BY cnt DESC ) as rn from " +
"("+ sqlQuery +") ) " +
"where rn <= 2 ");
tableEnv.toDataStream(query).print();
env.execute();
}
}
5. Join查询
常规联结查询
间隔联结查询
6. 函数
系统函数
UDF函数
public class tableDemo4 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String ddlTable = "CREATE TABLE clickTable (" +
" user_name STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
" WATERMARK FOR et as et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(ddlTable);
//2.注册自定义标量函数
tableEnv.registerFunction("MyHash",new MyHashFunction());
//createTemporarySystemFunction??怎么用 -- 见下面遇到的bug
//tableEnv.createTemporarySystemFunction("MyHash",MyHashFunction.class);
//3.调用UDF进行查询转换 (查询当前user以及user的hashcode)
Table resultTable = tableEnv.sqlQuery("select user_name,MyHash(user_name) from clickTable");
//4.转换成流打印
tableEnv.toDataStream(resultTable).print();
env.execute();
}
//自定义实现ScalarFunction
public static class MyHashFunction extends ScalarFunction{
public int eval(String str){
return str.hashCode();
}
}
}
bug:Unsupported class file major version 58
调整java版本:类文件具有错误的版本
public class tableDemo4 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String ddlTable = "CREATE TABLE clickTable (" +
" user_name STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
" WATERMARK FOR et as et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'src/main/resources/c.txt', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(ddlTable);
//2.注册自定义标量函数
tableEnv.createTemporarySystemFunction("MySplit", MyFunction.class);
//createTemporarySystemFunction??怎么用
//3.调用UDF进行查询转换 (查询当前user以及user的hashcode)
Table resultTable = tableEnv.sqlQuery("select user_name,url, word,length from clickTable ,LATERAL TABLE(MySplit(url)) AS T(word,length)");
//4.转换成流打印
tableEnv.toDataStream(resultTable).print();
env.execute();
}
//自定义实现ScalarFunction
@FunctionHint (output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class MyFunction extends TableFunction<Row> {
public void eval(String str){
String[] split = str.split(",");
for (String s : split) {
collect(Row.of(s,s.length()));
}
}
}
}
bug:List of column aliases must have same degree as table; the returned table of function
自定义类集成TableFunction时没有实现getResultType方法如我们此时输出的是Row类型,我们就需要重写此方法(旧API需要重新)
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.INT);
}
7. SQL客户端
8. 连接到外部系统
kafka
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11', -- required: valid connector versions are
-- "0.8", "0.9", "0.10", "0.11", and "universal"
'connector.topic' = 'topic_name', -- required: topic name from which the table is read
'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string
'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string
'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group
'connector.startup-mode' = 'earliest-offset', -- optional: valid modes are "earliest-offset",
-- "latest-offset", "group-offsets",
-- or "specific-offsets"
-- optional: used in case of startup mode with specific offsets
'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',
'connector.sink-partitioner' = '...', -- optional: output partitioning from Flink's partitions
-- into Kafka's partitions valid are "fixed"
-- (each Flink partition ends up in at most one Kafka partition),
-- "round-robin" (a Flink partition is distributed to
-- Kafka partitions round-robin)
-- "custom" (use a custom FlinkKafkaPartitioner subclass)
-- optional: used in case of sink partitioner custom
'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
'format.type' = '...', -- required: Kafka connector requires to specify a format,
... -- the supported formats are 'csv', 'json' and 'avro'.
-- Please refer to Table Formats section for more details.
)
public class tableDemo6 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String ddlTable = "CREATE TABLE clickTable (" +
" user_name STRING, " +
" ts BIGINT, " +
" et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
" WATERMARK FOR et as et - INTERVAL '4' SECOND " +
") WITH (" +
" 'connector' = 'kafka', " +
" 'topic' = 'topic03', " +
" 'properties.bootstrap.servers' = '43.142.80.86:9093', " +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(ddlTable);
Table table = tableEnv.sqlQuery("select * from clickTable");
tableEnv.toDataStream(table).print();
env.execute();
}
}