设计匠艺

运用Aggregator模式实现MapReduce

2016-09-27  本文已影响652人  _张逸_

MapReduce是更好地利用并行计算资源来提升数据处理能力的重要算法,如今已被主流的大数据分析平台实现,成为了大数据批量处理的主力军。利用前面介绍的Actor特性,其实我们也可以实现一个简易的MapReduce。

利用AKKA Actor来实现MapReduce,天生就支持并行计算(利用远程Actor)与异步操作。为了简便起见,本例使用了本地的Actor实现了大数据世界的Hello World,即WordCounter。

在编写字数统计器的MapReduce之前,我们需要先分辨职责,包括:

考虑到本文的中心主题是介绍响应式编程与Actor模型,所以我们降低了案例难度,读取的网页内容均为英文,并简单地以空格作为分词的标志。由于我们需要接受客户端的字数统计分析请求,那么要完成前面提到的职责,至少需要四个Actor:

为了尽可能地提升性能,对于获取网页内容以及统计内容字数的统计工作,我们都需要多个Actor同时执行。然而,由于每个Actor处理消息都是以异步形式进行,我们该怎样才能知道并发处理的请求都得到了处理?针对字数统计器的案例而言,我们还需要将每个Actor统计获得的字数再进行reduce,同样也需要知道是否每条消息都已经处理完毕,并获得处理的结果。

AKKA通过Aggregator特性实现了Aggregator模式,可以很好地解决刚才提到的问题。它通过引入一个单独的聚合器Actor,用以聚合多个Actor产生的数据,并根据这些Actor对消息的Response更新状态

假定ContentWordCounter分析后的结果如下代码所示:

case class AnalysisResult(wordToCount: Seq[(String, Long)])

那么,Aggregator就可以通过在其内部维持一个分析结果集(即前面所谓的状态,代码中的analysisResults),每收到一个Actor的Response,就将结果塞入到这个结果集(更新状态)中,并判断结果集的长度是否等于要处理的网页数,以此作为消息是否处理完毕的条件。整个Aggregator的实现如下:

class WordCounterAggregator extends Actor with Aggregator {  expectOnce {
    case StartAggregation(target, urls) =>
      new Handler(target, urls, sender)
    case _ =>
      sender ! BadCommand
      context stop self
  }
  class Handler(target: ActorRef, urls: Seq[String], originalSender: ActorRef) {
    var analysisResults = Set.empty[AnalysisResult]
    context.system.scheduler.scheduleOnce(10.seconds, self, Timeout)
    expect {
      case Timeout =>
        respondIfDone(respondAnyway = true)
    }
    urls.foreach { uri =>
      target ! FetchPageContent(uri)
      expectOnce {
        case result: AnalysisResult =>
          analysisResults += result
          respondIfDone()
      }
    }
    def respondIfDone(respondAnyway: Boolean = false) = {
      import MapSeqImplicits._
      if (respondAnyway || analysisResults.size == urls.size) {
        val wordToCounts = analysisResults.flatMap(_.wordToCount).reduceByKey(_ + _)
        originalSender ! AggregatedAnalysisResult(wordToCounts)
        context stop self
      }
    }
  }
}

WordCounterAggregator继承了Aggregator特性,这个特性已经对Actor的receive进行了处理,使得继承该特性的Actor不需要重写receive方法。Aggregator特性提供了expectexpectOnceunexpect,用以接收期待处理的消息。

在Aggregator内部,其实维持了一个expectList,用以存放expect等函数所接收的偏函数。expectexpectOnce都是将偏函数放入到这个列表中,只是后者只留存一次(通过permanent标志来判定),一旦匹配了,就会将该偏函数移除,而expect则不会;至于unexpect,就是expect的反操作,用于将偏函数从列表中移除。

自定义的respondIfDone方法会在满足聚合条件时,对分析结果进行reduce运算。Scala的集合库自身并没有提供reduceByKey()函数,是我模仿Spark的RDD自行编写的隐式转换方法:

object MapSeqImplicits {
  implicit class MapSeqWrapper(wordToCount: Iterable[(String, Long)]) {
    def reduceByKey(f: (Long, Long) => Long): Seq[(String, Long)] = {
      wordToCount.groupBy(_._1).map {
        case (word, counts) => (word, counts.map(_._2).foldLeft(0L)(f))      
      }.toSeq
    }
 }
}

因为引入了一个Aggregator,消息的处理以及Actor之间的协作就变得相对复杂。要进行响应式编程,其中一个关键就是要理清楚数据(或消息)的流动方向,并分辨每个数据处理器的职责。我们可以借助类似状态图之类的可视化工具帮助我们分析数据流动模型。下图是本例的一个消息处理模型,它同时还表达了Actor之间的协作关系。

Actor之间的协作

执行字数统计的流程如下所示:

由于Aggregator需要协调多个Fetcher与Counter的Actor,以支持异步并行计算(本例实则是并发计算)的需要,我为其引入了AKKA提供的Router Actor。通过Router可以创建一个容器Actor,内部管理多个worker rootees,并提供了RoundRobin、Random、Boardcast等多种路由形式,用户可以根据Actor的负载情况选择不同的路由方式。

这里,我选择使用RoundRobin以硬编码的形式创建了Router Actor:

val analyst: ActorRef = context.actorOf(Props(new ContentWordCounter(aggregator)), "PageContentAnalyst") 
val fetchers = context.actorOf(RoundRobinPool(4).props(Props(new PageContentFetcher(analyst))), "fetchers")

整体来看,PageContentFetcher与ContentWordCounter其实扮演的是map角色,并通过Router Actor来实现map工作的异步并发处理;而WordCounterAggregator则扮演了reduce角色,它负责将收到的多个分析结果进行reduce运算。

由于缺乏对MapReduce算法必要的封装,用AKKA Actor实现的MapReduce显得比较复杂,但却较好地体现了响应式编程的异步数据流本质。

当我们在使用Actor来处理异步消息传递时,当业务渐趋复杂后,我们常常会迷失在复杂的消息传递网中而无法自拔。为了保持清醒的头脑,需要时刻谨记Actor的职责。以我的经验,我们应该考虑:

要完成多个网页的字数统计功能,除了使用稍显复杂的Actor模式之外,我们也可以直接使用scala提供的并行集合来完成,代码更为精简:

val words = for {
 url <- urls.par
 line <- scala.io.Source.fromURL(url).getLines()
 word <- line.split(" ")
} yield (word)
val analysisResult = words.map(w => (w, 1L)).reduceByKey(_ + _)

在业务相对简单,并不需要非阻塞消息处理,也没有可伸缩性需求的时候,若能恰当运用scala自身提供的par集合会是好的选择。

事实上,为了实现字数统计的功能,采用AKKA提供的Aggregator确乎有些过度。它更擅长于通过将职责分治与合理运用基于消息的Actor模式来完成更为复杂的响应式系统。WordCounter的例子不外乎是我为了更好地解释Aggregator模式而给出的一个Demo罢了。


本文以及《利用Actor实现管道过滤器模式》两篇文章给出的源代码,可以在我的github上获得:https://github.com/agiledon/reactiveprogramming.git

上一篇下一篇

猜你喜欢

热点阅读