工作生活

Flink的分布式缓存

2019-07-03  本文已影响0人  JasonLee实时计算

Flink提供了一个分布式缓存,类似于Apache Hadoop,可以在本地访问用户函数的并行实例。此函数可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。
缓存的工作原理如下。程序在其作为缓存文件的特定名称下注册本地或远程文件系统(如HDFS或S3)的文件或目录ExecutionEnvironment。执行程序时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。
其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件.
分布式缓存使用如下:
注册中的文件或目录ExecutionEnvironment。

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

访问用户函数中的缓存文件或目录(此处为a MapFunction)。该函数必须扩展RichFunction类,因为它需要访问RuntimeContext。


// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // access cached file via RuntimeContext and DistributedCache
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // read the file (or navigate the directory)
    ...
  }

  override def map(value: String): Int = {
    // use content of cached file
    ...
  }
}

下面给出一个完整的demo,计算存在于缓存文件中的单词出现的次数,看下面的代码


package flink.cache

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source
import org.apache.flink.api.scala._

object FlinkCacheDemo {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(3)
    // 注册缓存的文件,里面有数据hello jason
    env.registerCachedFile("D:/test.txt", "testfile")
    val stream = env.fromElements("hello", "jason", "hello", "jim")
    val result = stream
      .flatMap(_.split(","))
      .map(new RichMapFunction[String, String] {
        var list: List[(String)] = _
        override def open(parameters: Configuration): Unit = {
          super.open(parameters)
          // 获取缓存的数据
          val file = getRuntimeContext.getDistributedCache.getFile("testfile")
          val lines = Source.fromFile(file.getAbsoluteFile).getLines()
          list = lines.toList
        }
        override def map(value: String): String = {
          var middle: String = ""
          if(list(0).contains(value)) {
            middle = value
          }
          middle
        }
      })
      .map((_,1L))
      .filter(_._1.nonEmpty)
      .groupBy(0)
      .sum(1)
      .print()
  }
}

运行代码输出的结果是,因为jim不在缓存的文件中,被过滤掉了
(hello,2)
(jason,1)

如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,更多的Flink和spark的干货可以加入下面的星球


image.png
上一篇下一篇

猜你喜欢

热点阅读