数客联盟

快速构建Flink Streaming SQL测试用例

2019-12-30  本文已影响0人  Woople

根据官网Structure of Table API and SQL Programs的样例介绍,如果要测试一个streaming SQL,需要注册一个table source,然后执行一个SQL,最后在注册一个table sink并将结果插入。table source和table sink有很多种,本文将介绍基于内存的table source和sink方便进行测试sql。代码传送门

测试用例以及Table source

public class StreamingBaseSqlDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(2);

        DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(new DataSource());
        tableEnv.registerDataStream("Orders", ds, "user, product, amount");
        Table result = tableEnv.sqlQuery("SELECT user, product, amount FROM Orders");

        String[] fieldNames = {"user", "product", "amount"};
        TypeInformation[] fieldTypes = {Types.LONG, Types.STRING, Types.INT};

        TableSink sink = new MemoryAppendStreamTableSink(fieldNames, fieldTypes);
        tableEnv.registerTableSink("output", sink);
        result.insertInto("output");

        env.execute();
    }

    private static class DataSource extends RichParallelSourceFunction<Tuple3<Long, String, Integer>> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
            String[] products = new String[]{"iPhoneX", "iPhone11", "iPhone11 Pro Max"};

            final long numElements = 20;
            int i = 0;
            while (running && i < numElements) {
                Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
                Tuple3 data = new Tuple3<Long, String, Integer>(RandomUtils.nextLong(1, 100), products[RandomUtils.nextInt(0, 3)],RandomUtils.nextInt(10000, 20000));
                ctx.collect(data);
                System.out.println("sand data:" + data);
                i++;
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

Table sink

public class MemoryAppendStreamTableSink implements AppendStreamTableSink<Row> {
    private String[] fieldNames;
    private TypeInformation<?>[] fieldTypes;

    public MemoryAppendStreamTableSink(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
    }

    @Override
    public void emitDataStream(DataStream<Row> dataStream) {
        consumeDataStream(dataStream);
    }

    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream.addSink(new DataSink()).setParallelism(dataStream.getParallelism());
    }

    @Override
    public TypeInformation<Row> getOutputType() {
        return new RowTypeInfo(getFieldTypes(), getFieldNames());
    }

    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }

    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return fieldTypes;
    }

    @Override
    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
        return this;
    }

    private class DataSink extends RichSinkFunction<Row> {
        public DataSink() {
        }

        @Override
        public void invoke(Row value, Context context) throws Exception {
            System.out.println("Result:" + value);
        }
    }
}

运行结果

例如:

sand data:(5,iPhone11,10598)
Result:41,iPhone11,13471
sand data:(41,iPhone11,13471)
Result:31,iPhone11 Pro Max,19726
sand data:(31,iPhone11 Pro Max,19726)
Result:16,iPhoneX,12939
sand data:(16,iPhoneX,12939)
Result:83,iPhone11,18861
上一篇下一篇

猜你喜欢

热点阅读