Scala Actor实现异步WordCount

2020-09-06  本文已影响0人  FredricZhu

ActorWordCountDemo.scala

import java.io.File

import com.sensetime.{ReceiveWord, SendFile, StopMessage}

import scala.actors.Actor
import scala.collection.mutable.ListBuffer
import scala.io.Source

object CountActor extends Actor {

  override def act(): Unit = {
    // 计算线程,注意需要使用while循环阻塞接收消息
    while(true) {
      receive {
            // 收到解析请求
        case SendFile(file) => {
          // 获取文件并做单个文件的WordCount
          val lines = Source.fromFile(file, "utf-8").getLines().toList
          val words = lines.flatMap(_.split(" "))
          val result = words.map((_, 1)).groupBy(_._1).mapValues(_.size)
          // 返回WordCount结果给WordActor线程
          sender ! ReceiveWord(result)
        }
          // 收到停止消息,退出线程
        case StopMessage => {
          println("Parse Stopped")
          return
        }
      }
    }
  }

}


class WordActor extends Actor {
  val countActor = CountActor

  var results: ListBuffer[Map[String, Int]] = ListBuffer[Map[String, Int]]()
  var files: List[File] = _

  def this(dirName: String) {
    this()
    val dir = new File(dirName)
    // 列出目录下所有文件
    files = dir.listFiles.filter(_.isFile).toList
  }

  override def act(): Unit = {
    // 发送各个文件给countActor进行处理
    for(file <- files) {
      // 发送各个文件
      countActor ! SendFile(file)
    }

    // 获取文件长度
    val len = files.length
    var i = 0
    // 逐个文件进行处理
    while(i<=len) {
      // 若处理完毕了
      if(i==len) {
        // 发送停止消息,等待计算线程停止
        countActor ! StopMessage
        // 计算最终结果并打印
        val stringToInt = results.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
        println(s"最终结果: $stringToInt")
        // 退出程序
        return
      }

      // 接收单个计算结果,并自增计数
      receive {
        case ReceiveWord(result) => {
          i +=1
          results += result
        }
      }
    }

  }
}

object WordActor  {
  def apply(dirName: String): WordActor = new WordActor(dirName)
}

object ActorWordCountDemo {
  def main(args: Array[String]): Unit = {
    WordActor("D:/wordcount").start()
    CountActor.start()
  }
}

Messages.scala

package com.sensetime

import java.io.File

case class SendFile(file: File)

case class ReceiveWord(result: Map[String, Int])

case class StopMessage

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>actor-word-count</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-actors</artifactId>
            <version>2.10.5</version>
        </dependency>
    </dependencies>
</project>

程序输出如下,


图片.png
上一篇 下一篇

猜你喜欢

热点阅读