Flink CEP SQL案例

2021-05-17  本文已影响0人  Anthons

描述:现在的任务是找出一个单一股票价格不断下降的时期。

一、Maven项目pom.xml

二、测试数据

三、代码

public class CepExample4 {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 2.创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        URL resource = CepExample1.class.getResource("/ticker.txt");
        DataStream<String> inputStream = env.readTextFile(resource.getPath());

        DataStream<Ticker> dataStream = inputStream.map(line -> {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String[] fields = line.trim().split(",");
            Long timestamp = sdf.parse(fields[1]).getTime();
            return new Ticker(fields[0], timestamp, new Long(fields[2]), new Long(fields[3]));
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Ticker>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((Ticker, timestamp) -> Ticker.getTimestamp()));


        // dataStream转化成Table
        tableEnv.createTemporaryView("Ticker", dataStream, $("symbol"), $("timestamp").rowtime().as("rt"), $("price"), $("tax"));
        tableEnv.executeSql("SELECT * FROM Ticker").print();

        tableEnv.executeSql("SELECT * " +
                "FROM Ticker " +
                "MATCH_RECOGNIZE (" +
                "ORDER BY rt " +
                "MEASURES " + // 定义子句的输出。
                "START_ROW.rt AS start_tstamp, " +
                "LAST(PRICE_DOWN.rt) AS bottom_tstamp, " +
                "LAST(PRICE_UP.rt) AS end_tstamp " +
                "ONE ROW PER MATCH " + // 对于每一次成功的匹配,只产生一个输出事件。
                "AFTER MATCH SKIP TO LAST PRICE_UP " + // 匹配成功后,从匹配成功的事件序列中最后一个对应于patternItem的事件开始进行下一次匹配。
                "PATTERN (START_ROW PRICE_DOWN+ PRICE_UP) " + // 尽管不存在 START_ROW 模式变量,但它具有一个始终被评估为 TRUE 隐式条件。
                "DEFINE " +
                "PRICE_DOWN AS " +
                "(LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR " + // LAST(PRICE_DOWN, 1)表示PATTERN PRICE_DOWN所对应的事件序列中的倒数第1个事件。
                "PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1), " +
                "PRICE_UP AS " +
                "PRICE_UP.price > LAST(PRICE_DOWN.price, 1) " +
                ") ").print();

        env.execute();
    }
}

四、结果

上一篇 下一篇

猜你喜欢

热点阅读