flink之dataset连接操作

2022-05-11  本文已影响0人  万州客

简单,输出都不用看~

一,代码

package org.bbk.flink

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

import scala.collection.mutable.ListBuffer


object Demo {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val data1 = ListBuffer[Tuple2[Int, String]]()
    data1.append((1, "zs"))
    data1.append((2, "ls"))
    data1.append((3, "ww"))

    val data2 = ListBuffer[Tuple2[Int, String]]()
    data2.append((1, "beijing"))
    data2.append((2, "shanghai"))
    data2.append((3, "guangzhou"))

    val text1 = env.fromCollection(data1)
    val text2 = env.fromCollection(data2)
    println("=================leftOuterJoin=======================")
    text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second) =>{
      if (second == null) {
        (first._1,first._2, "null")
      } else {
        (first._1, first._2, second._2)
      }
    }).print()
    println("=================rightOuterJoin=======================")
    text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second) =>{
      if (second == null) {
        (second._1, "null", second._2)
      } else {
        (first._1, first._2, second._2)
      }
    }).print()
    println("=================fullOuterJoin=======================")
    text1.fullOuterJoin(text2).where(0).equalTo(0).apply((first,second) =>{
      if (first == null) {
        (second._1, "null",second._2)
      } else if(second == null) {
        (first._1,first._2, "null")
      } else {
        (first._1,first._2, second._2)
      }
    }).print()


  }
}


上一篇下一篇

猜你喜欢

热点阅读