Akka Stream之流中的错误处理
当流中的某个阶段失败时, 通常会导致整个流被拆掉。此时,每个阶段的下游得到关于失败通知和上游得到关于取消通知。
在许多情况下, 您可能希望避免完全的流失败, 这可以通过几种不同的方法完成:
-
recover
发出最终的元素, 然后在上游故障上正常完成流 -
recoverWithRetries
创建一个新的上游并从失败开始处理 - 在后退后重新启动流的部分
- 对支持监督策略的阶段使用监督策略
除了这些内置的用于错误处理的工具之外, 一个常见的模式是将流包装到一个actor中, 并让actor在失败时重新启动整个流。
Recover
recover
允许你注入一个最终元素,然后在上游失败时完成流。通过一个偏函数,决定哪些异常这样恢复。如果有个异常不匹配,流将失败。
如果您希望在失败时优雅地完成流, 而让下游知道出现了故障, 则recover
可能很有用。
Source(0 to 6).map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!")).recover {
case _: RuntimeException => "stream truncated"
}.runForeach(println)
则输出可能是:
0
1
2
3
4
stream truncated
recoverWithRetries
recoverWithRetries
允许你在失败的地方放入一个新的上游,在失败到指定的最大次数后恢复流。
通过一个偏函数,决定哪些异常这样恢复。如果有个异常不匹配,流将失败。
val planB = Source(List("five", "six", "seven", "eight"))
Source(0 to 10).map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!")).recoverWithRetries(attempts = 1, {
case _: RuntimeException => planB
}).runForeach(println)
输出将是
0
1
2
3
4
five
six
seven
eight
正如Akka为actor提供回退监督模式一样, Akka stream也提供了一个RestartSource
、RestartSink
和 RestartFlow
, 用于实施所谓指数回退监控策略, 在某个阶段失败时再次启动它, 每次重新启动的延迟时间越来越长。
当某个阶段因为外部资源是否可用而失败或完成时,而且需要一些时间重新启动,这种模式有用。一个主要的例子是当一个WebSocket连接因为HTTP服务器运行正在下降(可能因为超负荷)而失败时。通过使用指数回退,避免进行紧密的重新连接,这样既可以让HTTP服务器恢复一段时间,又避免在客户端使用不必要的资源。
以下代码段显示了如何使用akka.stream.scaladsl.RestartSource
创建一个回退监管,它将监督给定的Source。本例中,Source是一个服务器发送事件(SSE),由akka-http提供。如果此处流失败,将再次发送请求,以3,6,12,24和最终30秒的间隔增加(此处,由于 maxBackoff 参数,它将保持上限)。
val restartSource = RestartSource.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
) { () =>
// Create a source from a future of a source
Source.fromFutureSource {
// Make a single request with akka-http
Http().singleRequest(HttpRequest(
uri = "http://example.com/eventstream"))
// Unmarshall it as a source of server sent events
.flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
}
}
强烈建议使用 randomFactor 为回退间隔添加一点额外的方差, 以避免在完全相同的时间点重新启动多个流, 例如, 因为它们由于共享资源 (如相同的服务器下线,并在相同间隔后重启) 而停止。通过在重新启动间隔中增加额外的随机性, 这些流将在时间上稍有不同的点开始, 从而避免大量的通信量冲击恢复的服务器或他们都需要联系的其他资源。
上述 RestartSource 将永远不会终止, 除非Sink被送入取消。将它与 KillSwitch 结合使用通常会很方便, 以便在需要时可以终止它:
val killSwitch = restartSource
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left)
.run()
doSomethingElse()
killSwitch.shutdown()
Sink和flow也可以被监管,使用akka.stream.scaladsl.RestartSink
和akka.stream.scaladsl.RestartFlow
。RestartSink 在取消时重新启动, 而在输入端口取消、输出端口完成或输出端口发送错误时重新启动 RestartFlow。
监管策略
注意
支持监管策略的各个阶段都有明文规定, 如果一个阶段的文档中没有说明它遵守监管策略, 就意味着它失败, 而不是采用监管。
错误处理策略受actor监管策略的启发, 但语义已经适应了流处理的领域。最重要的区别是, 监管不是自动应用到流阶段, 而是每个阶段必须显式实现的东西。
在许多阶段, 实现对监管策略的支持可能甚至没有意义, 对于连接到外部技术的阶段尤其如此, 例如, 失败的连接如果立即尝试新连接, 可能仍然会失败。
对于实现监管的阶段, 在通过使用属性物化流时, 可以选择处理流元素的异常处理策略。
有三种方法可以处理应用程序代码中的异常:
- Stop - 流以失败完成。
- Resume - 元素被丢弃,流继续执行
- Restart - 元素被丢弃,且流在重启该阶段后继续执行。重新启动阶段意味着任何累积状态都被清除。 这通常通过创建阶段的新实例来执行。
默认情况下, 停止策略用于所有异常, 即在抛出异常时, 流将以失败完成。
implicit val materializer = ActorMaterializer()
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)
可以在materializer的设置中定义流的默认监管策略。
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
在这里你可以看到, 所有的 ArithmeticException 将恢复处理, 即导致除以零的元素被丢弃了。
注意
请注意, 丢弃元素可能会导致具有循环的图中出现死锁。
还可以为flow的所有操作定义监管策略。
implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.filter(100 / _ < 50).map(elem => 100 / (5 - elem))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(0 to 5).via(flow)
val result = source.runWith(Sink.fold(0)(_ + _))
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)
重新启动的工作方式与恢复类似,除了故障处理阶段的累加状态(如果有的话)将被重置。
implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
case _: IllegalArgumentException => Supervision.Restart
case _ => Supervision.Stop
}
val flow = Flow[Int]
.scan(0) { (acc, elem) =>
if (elem < 0) throw new IllegalArgumentException("negative not allowed")
else acc + elem
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(List(1, 3, -1, 5, 7)).via(flow)
val result = source.limit(1000).runWith(Sink.seq)
// 负数元素导致scan阶段重启
// 即再次从0开始
// 结果将是以Success(Vector(0, 1, 4, 0, 5, 12))完成的Future
来自mapAsync错误
流监管也可以应用于mapAsync
和mapAsyncUnordered
的future,即使这些错误发生于future而不是在阶段自身。
假设我们使用外部服务来查找电子邮件地址,我们希望丢弃那些无法找到的地址。
我们开始于推文的作者流:
val authors: Source[Author, NotUsed] =
tweets
.filter(_.hashtags.contains(akkaTag))
.map(_.author)
假设我们可以使用以下方式查找其电子邮件地址:
def lookupEmail(handle: String): Future[String] =
当电子邮件没有找到时,Future
以Failure
完成。
通过使用lookupEmail
服务以及使用mapAsync
, 可以将作者流转换为电子邮件地址流, 并使用Supervision.resumingDecider
丢弃未知电子邮件地址:
import ActorAttributes.supervisionStrategy
import Supervision.resumingDecider
val emailAddresses: Source[String, NotUsed] =
authors.via(
Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.withAttributes(supervisionStrategy(resumingDecider)))
如果不使用Resume
而是默认的停止策略,那么流将在第一个带有Failure完成的Future时,以失败完成流。