2022-06-11-Flink-48(七)

2022-06-11  本文已影响0人  冰菓_

1. 基本Table API

引入依赖
   <!-- flink-table桥接器 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink-如果我们希望在IDEA中允许table则要引入如下依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

说明: 如果是生产环境,lib 目录下默认已经有了 planner,就只需要有 bridge 就可以了。当然,如果想使用用户自定义函数,或是跟 kafka 做连接,需要有一个 SQL client,这个包含在 flink-table-common 里

表创建和查询

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class tableDemo2 {

     public static void main(String[] args) {
          EnvironmentSettings setting = EnvironmentSettings.newInstance()
                  .inStreamingMode()
                  .useBlinkPlanner()
                  .build();
          
          //不通过流环境创建table环境
          //基于blink版本planner进行流处理
          TableEnvironment tableEnv = TableEnvironment.create(setting);
          
          /* 小米,./click,10000
          小红,./click,10000
          小牛,./click,10000
          小黄,./click,10000*/

          //创建一张输入的表
          String createDDL = "CREATE TABLE clickTable (" +
          " user_name STRING, "+
          " url STRING, " +
          " ts BIGINT " +
          ") WITH (" +
          " 'connector' = 'filesystem', " +
          " 'path' = 'src/main/resources/c.txt', " +
          " 'format' = 'csv'" +
          ")";

          tableEnv.executeSql(createDDL);

          //table-api查询
          Table table1 = tableEnv.from("clickTable").where($("user_name").isEqual("小米")).select($("user_name"), $("url"));
          tableEnv.createTemporaryView("table1",table1);



          //创建一张输出的表
          String createOutDDL = "CREATE TABLE outTable (" +
                  " user_name STRING, "+
                  " url STRING " +
                  ") WITH (" +
                  " 'connector' = 'filesystem', " +
                  " 'path' = 'output/2022-06-11', " +
                  " 'format' = 'csv' " +
                  ")";

          //执行SQL查询
          Table table2 = tableEnv.sqlQuery("select user_name , url from clickTable");

          Table table_view = tableEnv.sqlQuery("select user_name , url from table1");
          tableEnv.executeSql(createOutDDL);

          table_view.executeInsert("outTable");
          table2.executeInsert("outTable");

     }
}

查询有两种:一个是直接table-api查,另一种是SQL查询

bug:Exception in thread “main“ org.apache.flink.table.api.SqlParserException: SQL parse failed. Encounte
解决:user是关键字,或其他关键字问题,使用``框起来

//print打印
          String createOutDDL = "CREATE TABLE outTable (" +
                  " user_name STRING, "+
                  " url STRING " +
                  ") WITH (" +
                  " 'connector' = 'print' " +
                  ")";
表转换成流

更新日志流

import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class tableDemo3 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //创建一张输入的表
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";

        //注册表
        tableEnv.executeSql(createDDL);

        Table table = tableEnv.sqlQuery("select user_name , count(1) from clickTable where user_name = '小米' group by  user_name");


        tableEnv.toChangelogStream(table).print("agg");

        env.execute();
    }
}
流转换成表

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class tableDemo1 {

     public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
          env.setParallelism(1);
          SingleOutputStreamOperator<Event> streamOperator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
               @Override
               public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
               }
          }));

          //1. 创建表环境
          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

          //2. 将stream转换成table
          Table table = tableEnv.fromDataStream(streamOperator);

          //写SQL转换
          //这里的table不是表而是一个对象,实现流对象和表之间转换的媒介是虚拟表
          Table table1 = tableEnv.sqlQuery("select user , url , `timestamp` from " + table);

          //基于Table直接转换
          Table table2 = table.select($("user"), $("url"), $("timestamp")).where($("url").isEqual("./click"));

          tableEnv.toDataStream(table1).print("table1");
          tableEnv.toDataStream(table2).print("table2");

          env.execute();

     }

}

2. 流处理中的特殊概念

流处理和关系代数(表,及 SQL)的区别
区别
动态表(Dynamic Tables)

动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)。连续查询永远不会终止,并会生成另一个动态表。查询(Query)会不断更新其动态结果表,以反映其动态输入表上的更改。

流式持续查询的过程
流式持续查询过程
  1. 流被转换为动态表
  2. 对动态表计算连续查询,生成新的动态表
  3. 生成的动态表被转换回流
动态表转换成流

与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改。将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink 的 Table API 和 SQL 支持三种方式对动态表的更改进行编码:

  1. 仅追加(Append-only)流
    只有插入操作,不涉及数据的Upsert
  2. 撤回(Retract)流
    Retract 流是包含两类消息的流,添加(Add)消息和撤回(Retract)消息
  3. Upsert(更新插入)流
    要有唯一的key,撤回(Retract)流:UPDATE 编码为被更改行(前一行)的 retract 消息和更新后行(新行)的 add 消息

3. 时间和窗口

事件时间

public class tableDemo4 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //在DDL中定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(createDDL);

        //流转换表中定义时间属性
        SingleOutputStreamOperator<Event> streamOperator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

        Table table = tableEnv.fromDataStream(streamOperator, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());

        table.printSchema();
        env.execute();

    }
}
处理时间

proctime(计算列)


public class tableDemo4 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //在DDL中定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT, " +
                " et as PROCTIME() " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(createDDL);

        tableEnv.from("clickTable").printSchema();

        //流转换表中定义时间属性
        DataStreamSource<Event> streamOperator = env.addSource(new clickSource());

        Table table = tableEnv.fromDataStream(streamOperator, $("user"), $("url"), $("timestamp").as("ts"), $("et").proctime());

        table.printSchema();

    }
}

我蒙了???处理时间语义要用水位线吗,没必要把!!!

窗口(window)

4. 聚合查询

分组聚合(老版本)

public class tableDemo5 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);

        Table table1 = tableEnv.sqlQuery("select user_name, count(1) from clickTable group by user_name");

        Table table2 = tableEnv.sqlQuery("select user_name, count(1), TUMBLE_END(et, INTERVAL '10' SECOND )  from clickTable group by user_name , TUMBLE(et, INTERVAL '10' SECOND )");

        tableEnv.toChangelogStream(table1).print("agg: ");
        tableEnv.toDataStream(table2).print("window: ");

        env.execute();


    }
}

窗口聚合

使用窗口TVF实现


public class tableDemo6 {


    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/d.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);

        //滚动窗口,含头不含尾
        Table table3 = tableEnv.sqlQuery("select user_name, count(1),window_end from TABLE(TUMBLE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");

        //滑动窗口
        Table table4 = tableEnv.sqlQuery("select user_name, count(1),window_start, window_end from TABLE(HOP(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '5' SECOND ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");

        //累计窗口与滑动窗口的区别
        Table table5 = tableEnv.sqlQuery("select user_name, count(1),window_start, window_end from TABLE(CUMULATE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '5' SECOND ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");

        //tableEnv.toDataStream(table3).print("TUMBLE: ");
        tableEnv.toDataStream(table4).print("HOP: ");
        tableEnv.toDataStream(table5).print("CUMULATE: ");
        env.execute();


    }
}

理解累计窗口

小米,./click,1000
小米,./click,2000
小米,./click,5000
小米,./click,6000
小米,./click,9000
小米,./click,10000
小米,./click,10001
小米,./click,14001
小米,./click,15001
小米,./click,19001


HOP: > +I[小米, 2, 1970-01-01T07:59:55, 1970-01-01T08:00:05]
CUMULATE: > +I[小米, 2, 1970-01-01T08:00, 1970-01-01T08:00:05]
HOP: > +I[小米, 5, 1970-01-01T08:00, 1970-01-01T08:00:10]
CUMULATE: > +I[小米, 5, 1970-01-01T08:00, 1970-01-01T08:00:10]
CUMULATE: > +I[小米, 3, 1970-01-01T08:00:10, 1970-01-01T08:00:15]
CUMULATE: > +I[小米, 5, 1970-01-01T08:00:10, 1970-01-01T08:00:20]
HOP: > +I[小米, 6, 1970-01-01T08:00:05, 1970-01-01T08:00:15]
HOP: > +I[小米, 5, 1970-01-01T08:00:10, 1970-01-01T08:00:20]
HOP: > +I[小米, 2, 1970-01-01T08:00:15, 1970-01-01T08:00:25]
开窗函数

解决: OVER windows‘ ordering in stream mode must be defined on a time attribute

        //over 窗口
        Table table6 =  tableEnv.sqlQuery("select user_name, COUNT(1) OVER w as cnt from clickTable WINDOW w as (PARTITION BY user_name order by et rows between unbounded preceding and current row) ");
应用实例TOP_N案例

普通TOP_N

在执行过程中,Flink SQL会对输入的数据流根据排序键进行排序。如果某个分区的前N条记录发生了改变,则会将改变的那几条数据以更新流的形式发给下游

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
    ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  FROM table_name)
WHERE rownum <= N [AND conditions]

public class tableDemo1 {

    public static void main(String[] args) throws Exception {


        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);


        Table query = tableEnv.sqlQuery("select user_name, rn , cnt from " +
                "(select user_name,cnt, row_number() OVER(PARTITION BY user_name ORDER BY  cnt DESC ) as rn from " +
                "(select user_name , count(1) as cnt from clickTable group by user_name) ) " +
                "where rn <= 2 ");

        tableEnv.toChangelogStream(query).print();

        env.execute();
    }
}

窗口TOP_N


public class tableDemo2 {


    public static void main(String[] args) throws Exception {


        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(ddlTable);

        String sqlQuery = "select user_name , count(1) as cnt , window_start,window_end from TABLE(TUMBLE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '10' SECOND)) group by user_name,window_start,window_end";

        Table query = tableEnv.sqlQuery("select user_name, rn , cnt,window_start,window_end from " +
                "(select user_name,cnt,window_start,window_end, row_number() OVER(PARTITION BY window_start,window_end ORDER BY  cnt DESC ) as rn from " +
                "("+ sqlQuery +") ) " +
                "where rn <= 2 ");

        tableEnv.toDataStream(query).print();

        env.execute();
    }
}

5. Join查询

常规联结查询
间隔联结查询

6. 函数

系统函数
UDF函数
public class tableDemo4 {

    public static void main(String[] args) throws Exception {

        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);

        //2.注册自定义标量函数
        tableEnv.registerFunction("MyHash",new MyHashFunction());
        //createTemporarySystemFunction??怎么用 -- 见下面遇到的bug
        //tableEnv.createTemporarySystemFunction("MyHash",MyHashFunction.class);
        //3.调用UDF进行查询转换 (查询当前user以及user的hashcode)
        Table resultTable = tableEnv.sqlQuery("select user_name,MyHash(user_name) from clickTable");

        //4.转换成流打印
        tableEnv.toDataStream(resultTable).print();

        env.execute();
    }

    //自定义实现ScalarFunction
    public static class MyHashFunction extends ScalarFunction{
        public int eval(String str){
            return str.hashCode();
        }
    }
}

bug:Unsupported class file major version 58
调整java版本:类文件具有错误的版本

调整java版本,14不支持...查了好久

public class tableDemo4 {

    public static void main(String[] args) throws Exception {

        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);

        //2.注册自定义标量函数
        tableEnv.createTemporarySystemFunction("MySplit", MyFunction.class);
        //createTemporarySystemFunction??怎么用
        //3.调用UDF进行查询转换 (查询当前user以及user的hashcode)
        Table resultTable = tableEnv.sqlQuery("select user_name,url, word,length from clickTable ,LATERAL TABLE(MySplit(url)) AS T(word,length)");

        //4.转换成流打印
        tableEnv.toDataStream(resultTable).print();

        env.execute();
    }

    //自定义实现ScalarFunction
    @FunctionHint (output = @DataTypeHint("ROW<word STRING, length INT>"))
    public static class MyFunction extends TableFunction<Row> {

        public void eval(String str){
            String[] split = str.split(",");
            for (String s : split) {
                 collect(Row.of(s,s.length()));
            }
        }
    }
}

bug:List of column aliases must have same degree as table; the returned table of function
自定义类集成TableFunction时没有实现getResultType方法如我们此时输出的是Row类型,我们就需要重写此方法(旧API需要重新)

@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.INT);
}

7. SQL客户端

8. 连接到外部系统

kafka

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'kafka',       

  'connector.version' = '0.11',     -- required: valid connector versions are
                                    -- "0.8", "0.9", "0.10", "0.11", and "universal"

  'connector.topic' = 'topic_name', -- required: topic name from which the table is read

  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string
  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group
  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset", 
                                                   -- "latest-offset", "group-offsets", 
                                                   -- or "specific-offsets"

  -- optional: used in case of startup mode with specific offsets
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

  'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions 
                                         -- into Kafka's partitions valid are "fixed" 
                                         -- (each Flink partition ends up in at most one Kafka partition),
                                         -- "round-robin" (a Flink partition is distributed to 
                                         -- Kafka partitions round-robin)
                                         -- "custom" (use a custom FlinkKafkaPartitioner subclass)
  -- optional: used in case of sink partitioner custom
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
  
  'format.type' = '...',                 -- required: Kafka connector requires to specify a format,
  ...                                    -- the supported formats are 'csv', 'json' and 'avro'.
                                         -- Please refer to Table Formats section for more details.
)
public class tableDemo6 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '4' SECOND " +
                ") WITH (" +
                " 'connector' = 'kafka', " +
                " 'topic' = 'topic03', " +
                " 'properties.bootstrap.servers' = '43.142.80.86:9093', " +
                " 'scan.startup.mode' = 'earliest-offset', " +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(ddlTable);

        Table table = tableEnv.sqlQuery("select * from clickTable");

        tableEnv.toDataStream(table).print();
        env.execute();

    }
}
上一篇下一篇

猜你喜欢

热点阅读