flink分布式缓存实现示例

2022-05-14  本文已影响0人  万州客

生产场景可能是用大数据的文件系统,本机学习,就使用本地文件吧。

一,代码

package org.bbk.flink

import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration


object Demo {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    env.registerCachedFile("D:\\tmp\\count.txt", "advert")
    val data = env.fromElements("hello", "flink", "spark", "dataset")
    val result = data.map(new RichMapFunction[String, String] {
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val myFile = getRuntimeContext.getDistributedCache.getFile("advert")
        val lines = FileUtils.readLines(myFile)
        val it = lines.iterator()
        while (it.hasNext) {
          val line = it.next()
          println("line: " + line)
        }
      }
      override def map(value: String) = {
        value
      }
    }).setParallelism(2)
    result.print()
    env.execute()
  }
}

二,样例数据

三,输出

上一篇 下一篇

猜你喜欢

热点阅读