1. 程式人生 > >Akka(21): Stream:實時操控:人為中斷-KillSwitch

Akka(21): Stream:實時操控:人為中斷-KillSwitch

name hat 程序 註意 cond webkit val cal tdi

akka-stream是多線程non-blocking模式的,一般來說,運算任務提交到另外線程後這個線程就會在當前程序控制之外自由運行了。任何時候如果需要終止運行中的數據流就必須采用一種任務柄(handler)方式來控制在其它線程內運行的任務。這個handler可以在提交運算任務時獲取。akka-stream提供了KillSwitch trait來支持這項功能:

/**
 * A [[KillSwitch]] allows completion of [[Graph]]s from the outside by completing [[Graph]]s of [[FlowShape]] linked
 * to the switch. Depending on whether the [[KillSwitch]] is a [[UniqueKillSwitch]] or a [[SharedKillSwitch]] one or
 * multiple streams might be linked with the switch. For details see the documentation of the concrete subclasses of
 * this interface.
 
*/ //#kill-switch trait KillSwitch { /** * After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally. */ def shutdown(): Unit /** * After calling [[KillSwitch#abort()]] the linked [[Graph]]s of [[FlowShape]] are failed. */ def abort(ex: Throwable): Unit }
//#kill-switch

可以想象:我們必須把這個KillSwitch放在一個流圖中間,所以它是一種FlowShape的,這可以從KillSwitch的構建器代碼裏可以看得到:

object KillSwitches {

  /**
   * Creates a new [[SharedKillSwitch]] with the given name that can be used to control the completion of multiple
   * streams from the outside simultaneously.
   *
   * @see SharedKillSwitch
   
*/ def shared(name: String): SharedKillSwitch = new SharedKillSwitch(name) /** * Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion * of that unique materialization. Different materializations result in different, independent switches. * * For a Bidi version see [[KillSwitch#singleBidi]] */ def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch] = UniqueKillSwitchStage.asInstanceOf[Graph[FlowShape[T, T], UniqueKillSwitch]] /** * Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion * of that unique materialization. Different materializations result in different, independent switches. * * For a Flow version see [[KillSwitch#single]] */ def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch] = UniqueBidiKillSwitchStage.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]] ...}

akka-stream提供了single,shared,singleBidi三種KillSwitch的構建方式,它們的形狀都是FlowShape。KillSwitches.single返回結果類型是Graph[FlowShape[T,T],UniqueKillSwitch]。因為我們需要獲取這個KillSwitch的控制柄,所以要用viaMat來可運算化(materialize)這個Graph,然後後選擇右邊的類型UniqueKillSwitch。這個類型可以控制一個可運算化FlowShape的Graph,如下:

  val source = Source(Stream.from(1,2)).delay(1.second,DelayOverflowStrategy.backpressure)
  val sink = Sink.foreach(println)
  val killSwitch = source.viaMat(KillSwitches.single)(Keep.right).to(sink).run()

  scala.io.StdIn.readLine()
  killSwitch.shutdown()
  println("terminated!")
  actorSys.terminate()

當然,也可以用異常方式中斷運行:

killSwitch.abort(new RuntimeException("boom!"))

source是一個不停頓每秒發出一個數字的數據源。如上所述:必須把KillSwitch放在source和sink中間形成數據流完整鏈狀。運算這個數據流時返回了handle killSwitch,我們可以使用這個killSwitch來shutdown或abort數據流運算。

KillSwitches.shared構建了一個SharedKillSwitch類型。這個類型可以被用來控制多個FlowShape Graph的終止運算。SharedKillSwitch類型裏的flow方法可以返回終止運算的控制柄handler:

 /**
   * Returns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking
   * [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this
   * switch will be stopped normally or failed.
   *
   * @tparam T Type of the elements the Flow will forward
   * @return   A reusable [[Graph]] that is linked with the switch. The materialized value provided is this switch itself.
   */
  def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch] = _flow.asInstanceOf[Graph[FlowShape[T, T], SharedKillSwitch]]

用flow構建的SharedKillSwitch實例就像immutable對象,我們可以在多個數據流中插入SharedKillSwitch,然後用這一個共享的handler去終止使用了這個SharedKillSwitch的數據流運算。下面是SharedKillSwitch的使用示範:

  val sharedKillSwitch = KillSwitches.shared("multi-ks")
  val source2 = Source(Stream.from(1)).delay(2.second,DelayOverflowStrategy.backpressure)

  source2.via(sharedKillSwitch.flow).to(sink).run()
  source.via(sharedKillSwitch.flow).to(sink).run()

  scala.io.StdIn.readLine()
  killSwitch.shutdown()
  sharedKillSwitch.shutdown()

註意:我們先構建了一個SharedKillSwitch實例,然後在source2,source數據通道中間加入了這個實例。因為我們已經獲取了sharedKillSwitch控制柄,所以不必理會返回結果,直接用via和to來連接上下遊節點(默認為Keep.left)。

還有一個KillSwitches.singleBidi類型,這種KillSwitch是用來終止雙流向數據流運算的。我們將在下篇討論裏介紹。

下面是本次示範的源代碼:

import akka.stream.scaladsl._
import akka.stream._
import akka.actor._
import scala.concurrent.duration._
object KillSwitchDemo extends App {
  implicit val actorSys = ActorSystem("sys")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSys)
      .withInputBuffer(16,16)
  )

  val source = Source(Stream.from(1,2)).delay(1.second,DelayOverflowStrategy.backpressure)
  val sink = Sink.foreach(println)
  val killSwitch = source.viaMat(KillSwitches.single)(Keep.right).to(sink).run()

  val sharedKillSwitch = KillSwitches.shared("multi-ks")
  val source2 = Source(Stream.from(1)).delay(2.second,DelayOverflowStrategy.backpressure)

  source2.via(sharedKillSwitch.flow).to(sink).run()
  source.via(sharedKillSwitch.flow).to(sink).run()


  scala.io.StdIn.readLine()
  killSwitch.shutdown()
  sharedKillSwitch.shutdown()
  println("terminated!")
  actorSys.terminate()
  

}

Akka(21): Stream:實時操控:人為中斷-KillSwitch