Akka sharding示例

2018-05-08  本文已影响0人  mango_knight

这个示例来自Lightbend的 Lightbend Project Starter里的Cluster Sharding Scala。
是一个简单的分片集群用做记录温度的例子:

Device是Entity/Actor在每个分片上运行多个,用来记录温度并计算平均值。

Device接收消息时默认(初始)情况下执行 counting(Nil)创建一个空的List。

通过模式匹配RecordTemperature(id,temp)类型消息

消息在Device的伴生对象中定义,可以在外部调用Device.RecordTemperature()

将温度存入List 并改变自己的下一步行为 become(counting(temperature))
这样每当该actor收到消息就会递归地增加List

become()unbecome()方法是将运算行为压入/弹出行为栈。第一次become(counting(temperature))是将在栈顶端的初始receive方法counting(Nil)换成counting(temperature)
因为counting()是PartialFunction偏函数,偏函数也是对象,也有它的状态。temperatures可以保存起来。

    import akka.actor._

    object Device {
      case class RecordTemperature(deviceId: Int, temperature: Double)
    }
    class Device extends Actor with ActorLogging {
      import Device._

      override def receive = counting(Nil)

      def counting(values: List[Double]): Receive = {
        case RecordTemperature(id, temp) =>
          val temperatures = temp :: values
          log.info(s"Recording temperature $temp for device $id, 
    average is ${temperatures.sum / temperatures.size} after ${temperatures.size} readings");
          context.become(counting(temperatures))
      }
    }

Devices

    object Devices {
      // Update a random device
      case object UpdateDevice
    }

    class Devices extends Actor with ActorLogging {
      import Devices._

      private val extractEntityId: ShardRegion.ExtractEntityId = {
        case msg @ Device.RecordTemperature(id, _) => (id.toString, msg)
      }

      private val numberOfShards = 100

      private val extractShardId: ShardRegion.ExtractShardId = {
        case Device.RecordTemperature(id, _) => (id % numberOfShards).toString
        // Needed if you want to use 'remember entities':
        //case ShardRegion.StartEntity(id) => (id.toLong % numberOfShards).toString
      }

      val deviceRegion: ActorRef = ClusterSharding(context.system).start(
          typeName = "Device",
          entityProps = Props[Device],
          settings = ClusterShardingSettings(context.system),
          extractEntityId = extractEntityId,
          extractShardId = extractShardId)

      val random = new Random()
      val numberOfDevices = 50

      implicit val ec: ExecutionContext = context.dispatcher
      context.system.scheduler.schedule(10.seconds, 1.second, self, UpdateDevice)

      def receive = {
        case UpdateDevice =>
          val deviceId = random.nextInt(numberOfDevices)
          val temperature = 5 + 30 * random.nextDouble()
          val msg = Device.RecordTemperature(deviceId, temperature)
          log.info(s"Sending $msg");
          deviceRegion ! msg
      }
    }

在idea的scala插件中导入:




使用sbt构建完成后运行sample.sharding.ShardingApp

上一篇 下一篇

猜你喜欢

热点阅读