Flink SQL 入门指北

2020-11-02  本文已影响0人  xiaoc024

1. Overview

本文主要来自官网,旨在整理处 Flink SQL 的基本语法和使用,基础向。

2. API 调用

2.1 Old Planner VS Blink Planner

Flink 1.11 已经默认使用 Blink Planner。

2.2 基本程序结构

1.创建 TableEnvironment ( old/blink planner + stream/batch )
2.创建表( tableEnv.connect 外部数据源 或者 tableEnv.fromDataStream )
3.查询表( Table API 或者 SQL )
4.输出表( table.insertInto("xxtable") 或者 table.toRetractStream[T]/toAppendStream[T])

2.3 创建 TableEnvironment

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

2.4 创建表

// 1.通过外部数据源创建
//数据格式:sensor_1,1547718225,22.8
tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sensor")
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")


// 2.通过 datastream 转换
val table1: Table = tableEnv.fromDataStream(stream)

2.5 查询表

//table api
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")

// SQL
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id, temperature
|from inputTable
|where id = 'sensor_1'
""".stripMargin)

2.6 表转流的三种输出模式

DataStream 只支持 Append 和 Retract 模式。(toRetractStream[T] & toAppendStream[T] )
外部文件系统的流支持哪种模式取决于具体实现,比如 Kakfa 只支持 Append 模式。

2.7 输出表

tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

    // 转换操作
    val sensorTable: Table = tableEnv.from("inputTable")
    // 简单转换
    val resultTable: Table = sensorTable
      .select('id, 'temp)
      .filter('id === "sensor_1")

    //  聚合转换
    val aggTable: Table = sensorTable
      .groupBy('id)    
      .select('id, 'id.count as 'count)

    // 输出到外部文件系统或者 DataStream
    val outputPath = "..."

    // 注册输出表
    tableEnv.connect(new FileSystem().path(outputPath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("outputTable")

    //aggTable.insertInto("outputTable")  aggTable 因为有修改操作,CsvTableSink 只实现了 AppendStreamTableSink,所以无法输出到文件。
    resultTable.insertInto("outputTable")

    resultTable.toAppendStream[(String, Double)].print("result")
    // aggTable 因为有修改操作不能使用 append,需要使用 Retract
    aggTable.toRetractStream[Row].print("agg")

3 动态表

3.1 DataStream 上的关系查询

关系代数 / SQL 流处理
关系(或表)是有界(多)元组集合。 流是一个无限元组序列。
对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据。 流式查询在启动时不能访问所有数据,必须“等待”数据流入。
批处理查询在产生固定大小的结果后终止。 流查询不断地根据接收到的记录更新其结果,并且始终不会结束。

尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。高级关系数据库系统提供了一个称为 物化视图(Materialized Views) 的特性。物化视图被定义为一条 SQL 查询,缓存查询的结果。缓存的一个常见难题是防止缓存结果过期。当其定义查询的基表被修改时,物化视图将过期。 即时视图维护(Eager View Maintenance) 是一种一旦更新了物化视图的基表就立即更新视图的技术。

3.2 动态表 & 连续查询( Continuous Query )

动态表查询流程
  1. 将流转换为动态表。
  2. 在动态表上计算一个连续查询,生成一个新的动态表。
  3. 生成的动态表被转换回流。
流转为动态表
连续查询并生成新动态表
动态表转换回流(Retract模式)

4. 窗口和时间语义

关于窗口和时间语义的介绍可以参考这篇文章。之前是在流上进行讨论的。Flink 在表上同样支持相应的逻辑。

4.1 时间语义

可以通过 DDL 方式创建两种时间语义,但是比较晦涩,这里不做举例,感兴趣可以到官网查看。

4.1.1 processing time

注意处理时间属性一定不能定义在一个已有字段上

4.1.2 event time

4.2 窗口操作

窗口操作相当于对数据进行分组时,除了按照字段以外,增加了新的维度进行分组,一般是时间或者数据数量。

4.2.1 Group Windows

根据时间或者行数间隔,将行聚集在有限的组中,并对每个组的数据执行一次聚合函数。最终每个组得出一个结果,类似于传统对 group by 操作

// 基本使用结构
val table = input
        .window([w:GroupWindow] as "w") //定义窗口和别名 w
        .groupBy($"w",$"a")  //以属性 a 和窗口 w 作为分组的key
        .select($"a",$"b".sum)  //聚合字段b的值,求和

tumbling window
    - .window( Tumble over 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
    - .window( Tumble over 10.rows on $"a_proctime" as "w")
    - sql: tumble(ts, interval '10' second)

sliding windows 
    - .window( Slide over 10.minutes every 5.minutes on $"a_rowtime"/$"a_proctime as "w")
    - .window( Slide over 10.rows every 5.rows on $"a_proctime" as "w")
    - sql: hop(ts,interval '10' second,interval '10' second) p.s. 第二个是步长,第三个是窗口长度

session windows
    - .window( Session withGap 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
    - sql: session(ts,interval '10' second)

sql 辅助函数,xx = {tumble,hop,session}:
    - xx_start(ts, interval '10' second)
    - xx_end(ts, interval '10' second)
    - xx_rowtime(ts, interval '10' second)
    - xx_proctime(ts, interval '10' second)

4.2.2 Over Windows

针对每个输入行,进行开窗,增加一列表示结果,每个行都有自己所在窗口的结果。类似于传统的 over 操作

// 基本使用结构
val table = input
        .window([w:OverWindow] as "w") 
        .select($"a",$"b".sum over $"w", $"c".min over $"w")  

无界 over window
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_RANGE as "w")
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_ROW as "w")

有界 over window
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 1.minutes as "w")
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 10.rows as "w")
上一篇下一篇

猜你喜欢

热点阅读