Flink demo

2019-04-21  本文已影响0人  良人与我

下载 Flink

https://flink.apache.org/downloads.html#apache-flink-180

1.  Download a binary from the [downloads page](http://flink.apache.org/downloads.html). 
You can pick any Hadoop/Scala combination you like. 
If you plan to just use the local file system, any Hadoop version will work fine.
2.  Go to the download directory.
3.  Unpack the downloaded archive.

$ cd ~/Downloads        # Go to download directory
$ tar xzf flink-*.tgz   # Unpack the downloaded archive
$ cd flink-1.8.0

Start a Local Flink Cluster

$ ./bin/start-cluster.sh  # Start Flink

Check the Dispatcher’s web frontend at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.
通过web 检查是否启动成功
访问http://localhost:8081
看到如下的内容代表已经ok了

image.png

运行demo


/**
 * @author river
 * @date 2019/4/18 15:57
 **/
public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port;
        try {
//            final ParameterTool params = ParameterTool.fromArgs(args);
//            port = params.getInt("port");
            port = 9000;
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

开启端口9000

[river@s201 ~]$ nc -l 9000
hello lucy how are you
hello
how are you

通过flink 执行

[river@s201 flink-1.8.0]$ ./bin/flink run -c com.river.SocketWindowWordCount ~/Downloads/flink-demo-1.0-SNAPSHOT.jar 
Starting execution of program

查看日志

[river@s201 ~]$ tail -f /soft/flink-1.8.0/log/flink-river-taskexecutor-0-s201.out 
how : 1
hello : 1
lucy : 1
are : 1
you : 1
hello : 1
hello : 1
hello : 1
hello : 1
hello : 1
how : 1
are : 1
you : 1
are : 1

通过web 端也可以看到任务执行状态


image.png

参考地址
https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/local_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/flink_on_windows.html

上一篇下一篇

猜你喜欢

热点阅读