1. 程式人生 > >Akka(22): Stream:實時操控:動態管道連接-MergeHub,BroadcastHub and PartitionHub

Akka(22): Stream:實時操控:動態管道連接-MergeHub,BroadcastHub and PartitionHub

urn abs bound parameter code 結果類型 subscribe mil 又是

在現實中我們會經常遇到這樣的場景:有一個固定的數據源Source,我們希望按照程序運行狀態來接駁任意數量的下遊接收方subscriber、又或者我需要在程序運行時(runtime)把多個數據流向某個固定的數據流終端Sink推送。這就涉及到動態連接合並型Merge或擴散型Broadcast的數據流連接點junction。從akka-stream的技術文檔得知:一對多,多對一或多對多類型的復雜數據流組件必須用GraphDSL來設計,產生Graph類型結果。前面我們提到過:Graph就是一種運算預案,要求所有的運算環節都必須是預先明確指定的,如此應該是無法實現動態的管道連接的。但akka-stream提供了MergeHub,BroadcastHub和PartitionHub來支持這樣的功能需求。

1、MergeHub:多對一合並類型。支持動態的多個上遊publisher連接

2、BroadcastHub:一對多擴散類型。支持動態的多個下遊subscriber連接

3、PartitionHub:實際上是一對多擴散類型。通過一個函數來選擇數據派送目的地

MergeHub對象中有個source函數:

 /**
   * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
   * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
   * arbitrary many times and each of the materializations will feed the elements into the original [[Source]].
   *
   * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own
   * [[Sink]] for feeding that materialization.
   *
   * If one of the inputs fails the [[Sink]], the [[Source]] is failed in turn (possibly jumping over already buffered
   * elements). Completed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed
   * and any new producers using the [[Sink]] will be cancelled.
   *
   * @param perProducerBufferSize Buffer space used per producer. Default value is 16.
   
*/ def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = Source.fromGraph(new MergeHub[T](perProducerBufferSize))

MergeHub.source函數的返回結果類型是Source[T,Sink[T,NotUsed]],本質上MergeHub就是一個共用的Sink,如下所示:

  val fixedSink = Sink.foreach(println)
  val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] 
= MergeHub.source(perProducerBufferSize = 16).to(fixedSink) val inGate: Sink[Any,NotUsed] = sinkGraph.run() //common input //now connect any number of source val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() scala.io.StdIn.readLine() killSwitch.shutdown() killSwitch2.shutdown() killSwitch3.shutdown() actorSys.terminate()

同樣,BroadcastHub就是一種共用的Source,可以連接任何數量的下遊subscriber。下面是BroadcastHub.sink的定義:

  /**
   * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
   * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
   * value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the
   * broadcast elements from the original [[Sink]].
   *
   * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own
   * [[Source]] for consuming the [[Sink]] of that materialization.
   *
   * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized
   * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then
   * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later
   * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are
   * cancelled are simply removed from the dynamic set of consumers.
   *
   * @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two
   *                   concurrent consumers can be in terms of element. If this buffer is full, the producer
   *                   is backpressured. Must be a power of two and less than 4096.
   */
  def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize))

BroadcastHub.sink返回結果類型:Sink[T,Source[T,NotUsed]],就是個可連接任何數量下遊的共用Source:

  val killAll = KillSwitches.shared("terminator")
  val fixedSource=Source(Stream.from(100)).delay(1.second,DelayOverflowStrategy.backpressure)
  val sourceGraph = fixedSource.via(killAll.flow).toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).async
  val outPort = sourceGraph.run()  //shared source
  //now connect any number of sink to outPort
  outPort.to(Sink.foreach{c =>println(s"A: $c")}).run()
  outPort.to(Sink.foreach{c =>println(s"B: $c")}).run()
  outPort.to(Sink.foreach{c =>println(s"C: $c")}).run()

還有一種做法是把MergeHub和BroadcastHub背對背連接起來形成一種多對多的形狀。理論上應該能作為一種集散中心容許連接任何數量的上遊publisher和下遊subscriber。我們先把它們連接起來獲得一個Sink和一個Source:

val (sink, source)  = MergeHub.source[Int](perProducerBufferSize = 16)
           .toMat(BroadcastHub.sink(bufferSize = 16))(Keep.both).run()

理論上我們現在可以對sink和source進行任意連接了。但有個特殊情況是:當下遊沒有任何subscriber時上遊所有producer都無法發送任何數據。這是由於backpressure造成的:作為一個合成的節點,下遊速率跟不上則通過backpressure制約上遊數據發布。我們可以安裝一個泄洪機制來保證上遊publisher數據推送的正常進行:

  source.runWith(Sink.ignore)

這樣在沒有任何下遊subscriber的情況下,上遊producer還是能夠正常運作。

現在我們可以用Flow.fromSinkAndSource(sink, source)來構建一個Flow[I,O,?]:

  def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
    fromSinkAndSourceMat(sink, source)(Keep.none)

我們還可以把上篇提到的KillSwitches.singleBidi用上:

 val channel: Flow[Int, Int, UniqueKillSwitch] =
    Flow.fromSinkAndSource(sink, source)
      .joinMat(KillSwitches.singleBidi[Int, Int])(Keep.right)
      .backpressureTimeout(3.seconds)

上面backpressureTimeout保證了任何下遊subscriber阻塞超時的話都會被強力終止。如下:

  /**
   * If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
   * the stream is failed with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically,
   * so the resolution of the check is one period (equals to timeout value).
   *
   * ‘‘‘Emits when‘‘‘ upstream emits an element
   *
   * ‘‘‘Backpressures when‘‘‘ downstream backpressures
   *
   * ‘‘‘Completes when‘‘‘ upstream completes or fails if timeout elapses between element emission and downstream demand.
   *
   * ‘‘‘Cancels when‘‘‘ downstream cancels
   */
  def backpressureTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.BackpressureTimeout[Out](timeout))

好了,下面我們可以把channel當作Flow來使用了:

  val killChannel1 = fixedSource.viaMat(channel)(Keep.right).to(fixedSink).run()
  val killChannel2 = Source.repeat(888)
        .delay(2.second,DelayOverflowStrategy.backpressure)
        .viaMat(channel)(Keep.right).to(fixedSink).run()

上面我們提到:PartitionHub就是一種特殊的BroadcastHub。功能是擴散型的。不過PartitionHub用了一個函數來選擇下遊的subscriber。從PartitionHub.sink函數款式可以看出:

 def sink[T](partitioner: (Int, T) ⇒ Int, startAfterNrOfConsumers: Int,
              bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] =
    statefulSink(() ⇒ (info, elem) ⇒ info.consumerIdByIdx(partitioner(info.size, elem)), startAfterNrOfConsumers, bufferSize)

可以看出:partitioner函數就是一種典型的狀態轉換函數款式,實際上sink調用了statefulSink方法並固定了partitioner函數:

   * This `statefulSink` should be used when there is a need to keep mutable state in the partition function,
   * e.g. for implemening round-robin or sticky session kind of routing. If state is not needed the [[#sink]] can
   * be more convenient to use.
   *
   * @param partitioner Function that decides where to route an element. It is a factory of a function to
   *   to be able to hold stateful variables that are unique for each materialization. The function
   *   takes two parameters; the first is information about active consumers, including an array of consumer
   *   identifiers and the second is the stream element. The function should return the selected consumer
   *   identifier for the given element. The function will never be called when there are no active consumers,
   *   i.e. there is always at least one element in the array of identifiers.
   * @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
   *   This is only used initially when the stage is starting up, i.e. it is not honored when consumers have
   *   been removed (canceled).
   * @param bufferSize Total number of elements that can be buffered. If this buffer is full, the producer
   *   is backpressured.
   */
  @ApiMayChange def statefulSink[T](partitioner: () ⇒ (ConsumerInfo, T) ⇒ Long, startAfterNrOfConsumers: Int,
                                    bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] =
    Sink.fromGraph(new PartitionHub[T](partitioner, startAfterNrOfConsumers, bufferSize))

與BroadcastHub相同,我們首先構建一個共用的數據源producer,然後連接PartitionHub形成一個通往下遊終端的通道讓任何下遊subscriber可以連接這個通道:

 //interupted temination
  val killAll = KillSwitches.shared("terminator")
  //fix a producer
  val fixedSource = Source.tick(1.second, 1.second, "message")
    .zipWith(Source(1 to 100))((a, b) => s"$a-$b")
  //connect to PartitionHub which uses function to select sink
  val sourceGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.sink(
    (size, elem) => math.abs(elem.hashCode) % size,
    startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
  //materialize the source
  val fromSource = sourceGraph.run()
  //connect to fixedSource freely
  fromSource.runForeach(msg => println("subs1: " + msg))
  fromSource.runForeach(msg => println("subs2: " + msg))
  
  scala.io.StdIn.readLine()
  killAll.shutdown()
  actorSys.terminate()

可以看到:上遊數據流向多個下遊中哪個subscriber是通過partitioner函數選定的。從這項功能來講:PartitionHub又是某種路由Router。下面的例子實現了仿Router的RoundRobin推送策略:

  //partitioner function
  def roundRobin(): (PartitionHub.ConsumerInfo, String) ⇒ Long = {
    var i = -1L

    (info, elem) => {
      i += 1
      info.consumerIdByIdx((i % info.size).toInt)
    }
  }
  val roundRobinGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.statefulSink(
    () => roundRobin(),startAfterNrOfConsumers = 2,bufferSize = 256)
  )(Keep.right)
  val roundRobinSource = roundRobinGraph.run()

  roundRobinSource.runForeach(msg => println("roundRobin1: " + msg))
  roundRobinSource.runForeach(msg => println("roundRobin2: " + msg))

上面例子裏數據源流動方向是由roundRobin函數確定的。

而在下面這個例子裏數據流向速率最快的subscriber:

  val producer = Source(0 until 100)

  // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer.
  // Note that this is a moving target since the elements are consumed concurrently.
  val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
  producer.via(killAll.flow).toMat(PartitionHub.statefulSink(
    () => (info, elem) ⇒ info.consumerIds.minBy(id ⇒ info.queueSize(id)),
    startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right)

  val fromProducer: Source[Int, NotUsed] = runnableGraph.run()

  fromProducer.runForeach(msg => println("fast1: " + msg))
  fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping)
    .runForeach(msg => println("fast2: " + msg))

上面這個例子裏partitioner函數是根據眾下遊的緩沖數量(queueSize)來確定數據應該流向哪個subscriber,queueSize數值越大則表示速率越慢。

下面是以上示範中MergeHub及BroadcastHub示範的源代碼:

import akka.NotUsed
import akka.stream.scaladsl._
import akka.stream._
import akka.actor._

import scala.concurrent.duration._
object HubsDemo extends App {
  implicit val actorSys = ActorSystem("sys")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSys)
      .withInputBuffer(16,16)
  )

  val fixedSink = Sink.foreach(println)
  val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] = MergeHub.source(perProducerBufferSize = 16).to(fixedSink).async
  val inGate: Sink[Any,NotUsed] = sinkGraph.run()   //common input

  //now connect any number of source
  val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure)
      .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run()

  val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure)
    .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run()

  val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure)
    .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run()


  val killAll = KillSwitches.shared("terminator")
  val fixedSource=Source(Stream.from(100)).delay(1.second,DelayOverflowStrategy.backpressure)
  val sourceGraph = fixedSource.via(killAll.flow).toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).async
  val outPort = sourceGraph.run()  //shared source
  //now connect any number of sink to outPort
  outPort.to(Sink.foreach{c =>println(s"A: $c")}).run()
  outPort.to(Sink.foreach{c =>println(s"B: $c")}).run()
  outPort.to(Sink.foreach{c =>println(s"C: $c")}).run()


  val (sink, source)  = MergeHub.source[Int](perProducerBufferSize = 16)
           .toMat(BroadcastHub.sink(bufferSize = 16))(Keep.both).run()

  source.runWith(Sink.ignore)

  val channel: Flow[Int, Int, UniqueKillSwitch] =
    Flow.fromSinkAndSource(sink, source)
      .joinMat(KillSwitches.singleBidi[Int, Int])(Keep.right)
      .backpressureTimeout(3.seconds)

  val killChannel1 = fixedSource.viaMat(channel)(Keep.right).to(fixedSink).run()
  val killChannel2 = Source.repeat(888)
        .delay(2.second,DelayOverflowStrategy.backpressure)
        .viaMat(channel)(Keep.right).to(fixedSink).run()


  scala.io.StdIn.readLine()
  killSwitch.shutdown()
  killSwitch2.shutdown()
  killSwitch3.shutdown()
  killAll.shutdown()
  killChannel1.shutdown()
  killChannel2.shutdown()
  scala.io.StdIn.readLine()
  actorSys.terminate()


}

下面是PartitionHub示範源代碼:

import akka.NotUsed
import akka.stream.scaladsl._
import akka.stream._
import akka.actor._

import scala.concurrent.duration._
object PartitionHubDemo extends App {
  implicit val actorSys = ActorSystem("sys")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSys)
      .withInputBuffer(16,16)
  )

  //interupted temination
  val killAll = KillSwitches.shared("terminator")
  //fix a producer
  val fixedSource = Source.tick(1.second, 1.second, "message")
    .zipWith(Source(1 to 100))((a, b) => s"$a-$b")
  //connect to PartitionHub which uses function to select sink
  val sourceGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.sink(
    (size, elem) => math.abs(elem.hashCode) % size,
    startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
  //materialize the source
  val fromSource = sourceGraph.run()
  //connect to fixedSource freely
  fromSource.runForeach(msg => println("subs1: " + msg))
  fromSource.runForeach(msg => println("subs2: " + msg))

  //partitioner function
  def roundRobin(): (PartitionHub.ConsumerInfo, String) ⇒ Long = {
    var i = -1L

    (info, elem) => {
      i += 1
      info.consumerIdByIdx((i % info.size).toInt)
    }
  }
  val roundRobinGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.statefulSink(
    () => roundRobin(),startAfterNrOfConsumers = 2,bufferSize = 256)
  )(Keep.right)
  val roundRobinSource = roundRobinGraph.run()

  roundRobinSource.runForeach(msg => println("roundRobin1: " + msg))
  roundRobinSource.runForeach(msg => println("roundRobin2: " + msg))


  val producer = Source(0 until 100)

  // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer.
  // Note that this is a moving target since the elements are consumed concurrently.
  val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
  producer.via(killAll.flow).toMat(PartitionHub.statefulSink(
    () => (info, elem) ⇒ info.consumerIds.minBy(id ⇒ info.queueSize(id)),
    startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right)

  val fromProducer: Source[Int, NotUsed] = runnableGraph.run()

  fromProducer.runForeach(msg => println("fast1: " + msg))
  fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping)
    .runForeach(msg => println("fast2: " + msg))


  scala.io.StdIn.readLine()
  killAll.shutdown()
  actorSys.terminate()


}

Akka(22): Stream:實時操控:動態管道連接-MergeHub,BroadcastHub and PartitionHub