flink demo坏境搭建

2020-04-20  本文已影响0人  百岁叶

#flink demo开发坏境搭建
flink 入门idea的demo,Flink 提供了比较方便的创建 Flink 工程的方法
1、windows坏境,打开git bash

curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0
图片.png

2、batch demo

public class BatchJob {

    public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements(
                "Flink flink flink ",
                "spark spark spark",
                "Spark Spark Spark");

        AggregateOperator<Tuple2<String, Integer>> sum = text.flatMap(new LineSplitter())
                .groupBy(0)
                .sum(1);

        sum.print();


        //env.execute("Flink Batch Java API Skeleton");

        /*
         * Here, you can start creating your execution plan for Flink.
         *
         * Start with getting some data from the environment, like
         *  env.readTextFile(textPath);
         *
         * then, transform the resulting DataSet<String> using operations
         * like
         *  .filter()
         *  .flatMap()
         *  .join()
         *  .coGroup()
         *
         * and many more.
         * Have a look at the programming guide for the Java API:
         *
         * https://flink.apache.org/docs/latest/apis/batch/index.html
         *
         * and the examples
         *
         * https://flink.apache.org/docs/latest/apis/batch/examples.html
         *
         */

        // execute program

    }


    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>>{

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] split = s.toLowerCase().split("\\W+");

            for (String tokoen: split){
                collector.collect(new Tuple2<String, Integer>(tokoen,1));
            }
        }

    };
}

3 table sql demo

public class SqlDemo {


    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);

        String word="hello to flink flink";
        String[] split = word.split("\\W+");
        ArrayList<WC> list = new ArrayList<>();
        for(String wc:split){
            list.add(new WC(wc,1L));
        }

        DataSource<WC> arrayListDataSource = env.fromCollection(list);
        Table table = batchTableEnvironment.fromDataSet(arrayListDataSource,"word,num");

        table.printSchema();

        batchTableEnvironment.createTemporaryView("test",table);

        Table table1 = batchTableEnvironment.sqlQuery("select word,sum(num) as num from test group by word");

        DataSet<WC> wcDataSet = batchTableEnvironment.toDataSet(table1, WC.class);

        wcDataSet.printToErr();


    }


    public static  class WC  {
        public String word;
        public Long num;

        public WC() {
        }

        public WC(String word, Long num) {
            this.word = word;
            this.num = num;
        }


        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", num=" + num +
                    '}';
        }
    }


}

pom文件依赖

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <!-- Add connector dependencies here. They must be in the default scope (compile). -->
        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
上一篇 下一篇

猜你喜欢

热点阅读