flink之dataset连接操作
2022-05-11 本文已影响0人
万州客
简单,输出都不用看~
一,代码
package org.bbk.flink
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ListBuffer
object Demo {
def main(args:Array[String]):Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data1 = ListBuffer[Tuple2[Int, String]]()
data1.append((1, "zs"))
data1.append((2, "ls"))
data1.append((3, "ww"))
val data2 = ListBuffer[Tuple2[Int, String]]()
data2.append((1, "beijing"))
data2.append((2, "shanghai"))
data2.append((3, "guangzhou"))
val text1 = env.fromCollection(data1)
val text2 = env.fromCollection(data2)
println("=================leftOuterJoin=======================")
text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second) =>{
if (second == null) {
(first._1,first._2, "null")
} else {
(first._1, first._2, second._2)
}
}).print()
println("=================rightOuterJoin=======================")
text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second) =>{
if (second == null) {
(second._1, "null", second._2)
} else {
(first._1, first._2, second._2)
}
}).print()
println("=================fullOuterJoin=======================")
text1.fullOuterJoin(text2).where(0).equalTo(0).apply((first,second) =>{
if (first == null) {
(second._1, "null",second._2)
} else if(second == null) {
(first._1,first._2, "null")
} else {
(first._1,first._2, second._2)
}
}).print()
}
}