Flink之Connect和CoMap

2022-04-11  本文已影响0人  万州客

一般是成对使用

代码:

package com.intsmaze.flink.streaming.helloworld;

import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.co.CoMapFunction;


import java.util.ArrayList;
import java.util.List;

public class WorldCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<Long> listLong = new ArrayList<Long>();
        listLong.add(1L);
        listLong.add(2L);
        List<String> listStr = new ArrayList<String>();
        listStr.add("www cnblogs com sky");
        listStr.add("hello sky");
        listStr.add("hello flink");
        listStr.add("hello java");
        DataStream<Long> longDataStream = env.fromCollection(listLong);
        DataStream<String> stringDataStream = env.fromCollection(listStr);

        ConnectedStreams<Long, String> connectedStreams = longDataStream.connect(stringDataStream);

        DataStream<String> connectedMap = connectedStreams.map(new CoMapFunction<Long, String, String>() {
            public String map1(Long aLong) throws Exception {
                return "数据来自元素类型为Long的流: " + aLong;
            }

            public String map2(String s) throws Exception {
                return "数据来自元素类型为String的流: " + s;
            }
        });

        connectedMap.print("CoMapFunction输出结果: ");

        env.execute("CoMapFunction Template.");
    }
}

输出:

D:\jdk1.8.0_271\bin\java.exe "-javaagent:D:\JetBrains\IntelliJ IDEA Community Edition 2020.2.3\lib\idea_rt.jar=64082:D:\JetBrains\IntelliJ IDEA Community Edition 2020.2.3\bin" -Dfile.encoding=UTF-8 -classpath D:\jdk1.8.0_271\jre\lib\charsets.jar;D:\jdk1.8.0_271\jre\lib\deploy.jar;D:\jdk1.8.0_271\jre\lib\ext\access-bridge-64.jar;D:\jdk1.8.0_271\jre\lib\ext\cldrdata.jar;D:\jdk1.8.0_271\jre\lib\ext\dnsns.jar;D:\jdk1.8.0_271\jre\lib\ext\jaccess.jar;D:\jdk1.8.0_271\jre\lib\ext\jfxrt.jar;D:\jdk1.8.0_271\jre\lib\ext\localedata.jar;D:\jdk1.8.0_271\jre\lib\ext\nashorn.jar;D:\jdk1.8.0_271\jre\lib\ext\sunec.jar;D:\jdk1.8.0_271\jre\lib\ext\sunjce_provider.jar;D:\jdk1.8.0_271\jre\lib\ext\sunmscapi.jar;D:\jdk1.8.0_271\jre\lib\ext\sunpkcs11.jar;D:\jdk1.8.0_271\jre\lib\ext\zipfs.jar;D:\jdk1.8.0_271\jre\lib\javaws.jar;D:\jdk1.8.0_271\jre\lib\jce.jar;D:\jdk1.8.0_271\jre\lib\jfr.jar;D:\jdk1.8.0_271\jre\lib\jfxswt.jar;D:\jdk1.8.0_271\jre\lib\jsse.jar;D:\jdk1.8.0_271\jre\lib\management-agent.jar;D:\jdk1.8.0_271\jre\lib\plugin.jar;D:\jdk1.8.0_271\jre\lib\resources.jar;D:\jdk1.8.0_271\jre\lib\rt.jar;D:\Code\helloworld\target\classes;C:\Users\ccc\.m2\repository\org\apache\flink\flink-java\1.9.2\flink-java-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-core\1.9.2\flink-core-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-annotations\1.9.2\flink-annotations-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-metrics-core\1.9.2\flink-metrics-core-1.9.2.jar;C:\Users\ccc\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\ccc\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\ccc\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\ccc\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-7.0\flink-shaded-asm-6-6.2.1-7.0.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\ccc\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\ccc\.m2\repository\org\apache\flink\force-shading\1.9.2\force-shading-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-streaming-java_2.12\1.9.2\flink-streaming-java_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-runtime_2.12\1.9.2\flink-runtime_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.9.2\flink-queryable-state-client-java-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-hadoop-fs\1.9.2\flink-hadoop-fs-1.9.2.jar;C:\Users\ccc\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.32.Final-7.0\flink-shaded-netty-4.1.32.Final-7.0.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-jackson\2.10.1-9.0\flink-shaded-jackson-2.10.1-9.0.jar;C:\Users\ccc\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\ccc\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\ccc\.m2\repository\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\com\typesafe\config\1.3.3\config-1.3.3.jar;C:\Users\ccc\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;C:\Users\ccc\.m2\repository\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;C:\Users\ccc\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;C:\Users\ccc\.m2\repository\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;C:\Users\ccc\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\ccc\.m2\repository\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;C:\Users\ccc\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-clients_2.12\1.9.2\flink-clients_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-optimizer_2.12\1.9.2\flink-optimizer_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-7.0\flink-shaded-guava-18.0-7.0.jar;C:\Users\ccc\.m2\repository\org\slf4j\slf4j-api\1.7.21\slf4j-api-1.7.21.jar;C:\Users\ccc\.m2\repository\org\slf4j\slf4j-log4j12\1.7.21\slf4j-log4j12-1.7.21.jar;C:\Users\ccc\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar com.intsmaze.flink.streaming.helloworld.WorldCount
CoMapFunction输出结果: :8> 数据来自元素类型为String的流: hello flink
CoMapFunction输出结果: :6> 数据来自元素类型为String的流: www cnblogs com sky
CoMapFunction输出结果: :9> 数据来自元素类型为String的流: hello java
CoMapFunction输出结果: :12> 数据来自元素类型为Long的流: 1
CoMapFunction输出结果: :1> 数据来自元素类型为Long的流: 2
CoMapFunction输出结果: :7> 数据来自元素类型为String的流: hello sky

Process finished with exit code 0
上一篇下一篇

猜你喜欢

热点阅读