2022-03-22-Flink-43(二)

2022-03-22  本文已影响0人  冰菓_
环境配置
    <properties>
        <maven.compiler.source>14</maven.compiler.source>
        <maven.compiler.target>14</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
批处理:DataSet API 实现一个word count案例

DataSet API将面临淘汰,flink流处理才是核心,怎么用流实现批,提交的时候加一个参数:BATCH模式

public class woodcut {

    public static void main(String[] args) throws Exception {
        /*创建执行环境*/
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> source = executionEnvironment.readTextFile("src/main/resources/a.txt");
        FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = source.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }

        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        flatMapOperator.groupBy(0).sum(1).print();

    }
}
流处理:有界流
public class bindedwordcount {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment ex= StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = ex.readTextFile("src/main/resources/a.txt");
        SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord = streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

       /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        ex.execute();

    }
}
流处理:无界流

模拟测试一下,需要安装 nc,实现端口的监听 netcat 1.11

public class unbindedwordcount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 6666);

        SingleOutputStreamOperator<Tuple2<String, Integer>>  flapWord =
                streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        environment.execute();
    }

}

从args中获取参数
使用flink提供的ParameterTool.fromArgs(args);
注意写法没有 : --host localhost --port 6666

public class unbindedwordcount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String host = fromArgs.get("host");
        int port = fromArgs.getInt("port");
        DataStreamSource<String> streamSource = environment.socketTextStream(host, port);

        SingleOutputStreamOperator<Tuple2<String, Integer>>  flapWord =
                streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        environment.execute();
    }

}
上一篇 下一篇

猜你喜欢

热点阅读