squbsScala In Action

squbs-20. 流的生命周期

2017-02-03  本文已影响26人  吕亦行

原文地址:Streams Lifecycle
Akka Streams/Reactive流需要和服务的Runtime Lifecycle 集成。为了这个,一个自动化的或半自动话的集成通过 PerpetualStream实现。为了直接对流源的细粒度控制, LifecycleManaged提供一个包装,可以控制任何源组件的可能发生的停止或关闭,以便流可以优雅的启动/关闭。

永久流(PerpetualStream)

PerpetualStream是一个特性(triat),允许声明一个可以启动的流,当服务优雅的启动和关闭,不会在服务关闭的时候丢失消息。

使用 PerpetualStream的流将符合以下要求,将允许PerpetualStream 中的钩子通过最小自定义重写数无缝的工作:

  1. killSwitch.flow作为在source之后的流的第一个阶段。killSwitch 是一个标准的Akka SharedKillSwitch,通过PerpetualStream特性(trait)提供。
  2. stream实现 FutureProduct通过它们最后的元素。 ProductTuple, List和其他的超类。Sink物化Future是很自然的。如果更多的物化值需要,它通常来自某种形式的 ProductSink成为流最后的元素,也通常物化Product的最后一个元素。
  3. Future呈现流的完结(物化值或最后的物化值)。换句话说,流结束时future完成。

如果满足以上所有要求,没有其他自定义重写用于PerpetualStream函数。下面的代码符合PerpetualStream

class WellBehavedStream extends PerpetualStream[Future[Done]] {

  def generator = Iterator.iterate(0) { p => 
    if (p == Int.MaxValue) 0 else p + 1 
  }

  val source = Source.fromIterator(generator _)

  val ignoreSink = Sink.ignore
  
  override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(ignoreSink) {
    implicit builder =>
      sink =>
        import GraphDSL.Implicits._
        source ~> killSwitch.flow[Int] ~> sink
        ClosedShape
  })
}

这就是。这个流行为良好,因为它物化 sink物化值,即Future[Done]

关闭重写

有时不可能定义良好的流。举个例子,Sink可能不会物化 Future或你需要在关闭时做更多的清理。因为这个原因,可以通过重写shutdown如下:

  override def shutdown(): Future[Done] = {
    // Do all your cleanup
    // For safety, call super
    super.shutdown()
    // The Future from super.shutdown may not mean anything.
    // Feel free to create your own future that identifies the
    // stream being done. Return your Future instead.
  }

shutdown需要使用如下:

  1. 初始化流的关闭
  2. 做其他清理
  3. 当流结束处理,返回future

注意:建议任何情况下调用 super.shutdown。调用是无害的或有其他副作用。

备用关闭机制

相比于使用killSwitchsource 可以提供一个更好方式来正确的关闭。在这种情况下,仅使用source的关闭机制和重写 shutdown来发起source的关闭。killSwitch 依然未使用。

Kill Switch 重写

如果killSwitch需要跨多流共享,你可以重写 killSwitch来反射共享实例

  override lazy val killSwitch = mySharedKillSwitch

接收消息并将其转发到流

一些流从actor消息中获取输入。虽然可能一些流配置可以物化source中的ActorRef,然而很难调用这个actor。因为PerpetualStream自身是个actor,他可以拥有一个公开的地址/路径并且转发消息至流source。这样做,我们需要重写receive 如下:

  override def receive = {
    case msg: MyStreamMessage =>
      val (sourceActorRef, _) = matValue
      sourceActorRef forward msg
  }

处理流错误

PerpetualStream默认从错误中恢复不会被流stage捕获。这个消息引起忽略异常。如果需要一个不同的行为的话,重写 decider

  override def decider: Supervision.Decider = { t => 
    log.error("Uncaught error {} from stream", t)
    t.printStackTrace()
    Restart
  }

Restart将重启有错误的stage,而不会重启stream。查看Supervision Strategies获得可能的策略。

结合一下

下面的例子尽可能重写上面讨论的内容:

class MsgReceivingStream extends PerpetualStream[(ActorRef, Future[Done])] {

  val actorSource = Source.actorPublisher[MyStreamMsg](Props[MyPublisher])
  val ignoreSink = Sink.ignore[MyStreamMsg]
  
  override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(actorSource, ignoreSink)((_, _)) {
    implicit builder =>
      (source, sink) =>
        import GraphDSL.Implicits._
        source ~> sink
        ClosedShape
  })
  
  // Just forward the message to the stream source
  override def receive = {
    case msg: MyStreamMsg =>
      val (sourceActorRef, _) = matValue
      sourceActorRef forward msg
  }
  
  override def shutdown() = {
    val (sourceActorRef, _) = matValue
    sourceActorRef ! cancelStream
    // Sink materialization conforms
    // so super.shutdown() will give the right future
    super.shutdown()
  }
}

制作一个Lifecycle-Sensitive source

如果你期望拥有一个source,完全链接squbs的生命周期的动作,你可以将source包裹 LifecycleManaged:

Scala

val inSource = <your-original-source>
val aggregatedSource = LifecycleManaged().source(inSource)

Java

final Source inSource = <your-original-source>
final Source aggregatedSource = new LifecycleManaged(system).source(inSource);

这个结果source将集成source实例化成一个(T, ActorRef)TinSource 的实例化类型, ActorRef 是 trigger actor的实例化类型(从Unicomplex接收事件,squbs的容器)

这个集成source直到生命周期状态变成Active才开始从源source中发出,并且在生命周期状态成为 Stopping之后停止发出元素和关闭流。

个性化集成Triggered Source

如果你想要你的flow启动/停用个性化的事件,你可以整合一个个性化的trigger source,元素true将会启用,元素false将会停用。

注意 Trigger有一个参数eagerComplete默认为false在scala中,而在JAVA中需要传递。如果eagerComplete设置为false,trigger source 的结束或终止将脱离这个触发。如果设置为true,这个终止会完成这个流。

Scala

import org.squbs.stream.TriggerEvent._

val inSource = <your-original-source>
val trigger = <your-custom-trigger-source>.collect {
  case 0 => DISABLE
  case 1 => ENABLE
}

val aggregatedSource = new Trigger().source(inSource, trigger)

Java

import static org.squbs.stream.TriggerEvent.DISABLE;
import static org.squbs.stream.TriggerEvent.ENABLE;

final Source<?, ?> inSource = <your-original-source>;
final Source<?, ?> trigger = <your-custom-trigger-source>.collect(new PFBuilder<Integer, TriggerEvent>()
    .match(Integer.class, p -> p == 1, p -> ENABLE)
    .match(Integer.class, p -> p == 0, p -> DISABLE)
    .build());

final Source aggregatedSource = new Trigger(false).source(inSource, trigger);

个性化的生命周期事件触发

如果你想要在ActiveStopping之外响应更多生命周期,举个例子,你想要Failed来同时关闭flow,你可以修改生命周期事件映射。

import org.squbs.stream.TriggerEvent._

val inSource = <your-original-source>
val trigger = Source.actorPublisher[LifecycleState](Props.create(classOf[UnicomplexActorPublisher]))
  .collect {
    case Active => ENABLE
    case Stopping | Failed => DISABLE
  }

val aggregatedSource = new Trigger().source(inSource, trigger)
上一篇下一篇

猜你喜欢

热点阅读