1. 程式人生 > >Akka(23): Stream:自定義流構件功能-Custom defined stream processing stages

Akka(23): Stream:自定義流構件功能-Custom defined stream processing stages

從總體上看:akka-stream是由資料來源頭Source,流通節點Flow和資料流終點Sink三個框架性的流構件(stream components)組成的。這其中:Source和Sink是stream的兩個獨立端點,而Flow處於stream Source和Sink中間可能由多個通道式的節點組成,每個節點代表某些資料流元素轉化處理功能,它們的連結順序則可能代表整體作業的流程。一個完整的資料流(可執行資料流)必須是一個閉合的資料流,即:從外表上看,資料流兩頭必須連線一個Source和一個Sink。我們可以直接把一個Sink連線到一個Source來獲取一個最簡單的可執行資料流,如下:

  Source(1 to 10).runWith(Sink.foreach(println))

從另一個角度說明:akka-stream又包括資料流圖Graph及運算器Materializer兩個部分。Graph代表運算方案,Materializer負責準備環境並把運算方案Graph放置到Actor系統裡去實際運算產生效果(effects)及獲取運算結果。所以:akka-stream必須有一個Graph描述功能和流程。每個Graph又可以由一些代表更細小功能的子Graph組成。一個可執行資料流必須由一個閉合的資料流圖(closed graph)來代表,而這個ClosedGraph又是由代表不同資料轉化處理功能的子圖(sub-graph)組成。定製資料流功能就是針對Graph按功能需要進行自定義。
一個Graph可以用GraphShape和GraphStage兩個部分來描述:GraphShape描述了Graph的輸入輸出埠數量,GraphStage描述資料在流通中的轉化處理過程。我們先來分析一下GraphShape,它們的基類是Shape:

/**
 * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the
 * philosophy that a Graph is a freely reusable blueprint, everything that
 * matters from the outside are the connections that can be made with it,
 * otherwise it is just a black box.
 */
abstract class Shape {
  /**
   * Scala API: get a list of all input ports
   */
  def inlets: immutable.Seq[Inlet[_]]
 
  /**
   * Scala API: get a list of all output ports
   */
  def outlets: immutable.Seq[Outlet[_]]
 
  /**
   * Create a copy of this Shape object, returning the same type as the
   * original; this constraint can unfortunately not be expressed in the
   * type system.
   */
  def deepCopy(): Shape
...}

Shape的子類必須實現上面這三個抽象函式。akka-stream預先提供了一些基本的形狀,包括SourceShape/FlowShape/SinkShape:
/**
 * A Source [[Shape]] has exactly one output and no inputs, it models a source
 * of data.
 */
final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {
  override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
  override val outlets: immutable.Seq[Outlet[_]] = out :: Nil
 
  override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy())
}
object SourceShape {
  /** Java API */
  def of[T](outlet: Outlet[T @uncheckedVariance]): SourceShape[T] =
    SourceShape(outlet)
}
 
/**
 * A Flow [[Shape]] has exactly one input and one output, it looks from the
 * outside like a pipe (but it can be a complex topology of streams within of
 * course).
 */
final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape {
  override val inlets: immutable.Seq[Inlet[_]] = in :: Nil
  override val outlets: immutable.Seq[Outlet[_]] = out :: Nil
 
  override def deepCopy(): FlowShape[I, O] = FlowShape(in.carbonCopy(), out.carbonCopy())
}
object FlowShape {
  /** Java API */
  def of[I, O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]): FlowShape[I, O] =
    FlowShape(inlet, outlet)
}

還有一個稍微複雜點的雙向流形狀BidiShape:
//#bidi-shape
/**
 * A bidirectional flow of elements that consequently has two inputs and two
 * outputs, arranged like this:
 *
 * {{{
 *        +------+
 *  In1 ~>|      |~> Out1
 *        | bidi |
 * Out2 <~|      |<~ In2
 *        +------+
 * }}}
 */
final case class BidiShape[-In1, +Out1, -In2, +Out2](
  in1:  Inlet[In1 @uncheckedVariance],
  out1: Outlet[Out1 @uncheckedVariance],
  in2:  Inlet[In2 @uncheckedVariance],
  out2: Outlet[Out2 @uncheckedVariance]) extends Shape {
  //#implementation-details-elided
  override val inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil
  override val outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: Nil
  /**
   * Java API for creating from a pair of unidirectional flows.
   */
  def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.in, top.out, bottom.in, bottom.out)
  override def deepCopy(): BidiShape[In1, Out1, In2, Out2] =
    BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy())
  //#implementation-details-elided
}
//#bidi-shape
object BidiShape {
  def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] =
    BidiShape(top.in, top.out, bottom.in, bottom.out)
  /** Java API */
  def of[In1, Out1, In2, Out2](
    in1:  Inlet[In1 @uncheckedVariance],
    out1: Outlet[Out1 @uncheckedVariance],
    in2:  Inlet[In2 @uncheckedVariance],
    out2: Outlet[Out2 @uncheckedVariance]): BidiShape[In1, Out1, In2, Out2] =
    BidiShape(in1, out1, in2, out2)
}

還有一對多的UniformFanOutShape和多對一的UniformFanInShape。下面是我們自定義的一個多對多的Shape:
  case class TwoThreeShape[I, I2, O, O2, O3](
                                              in1: Inlet[I],
                                              in2: Inlet[I2],
                                              out1: Outlet[O],
                                              out2: Outlet[O2],
                                              out3: Outlet[O3]) extends Shape {
 
    override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil
 
    override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil
 
    override def deepCopy(): Shape = TwoThreeShape(
      in1.carbonCopy(),
      in2.carbonCopy(),
      out1.carbonCopy(),
      out2.carbonCopy(),
      out3.carbonCopy()
    )
  }

這是一個二進三出的形狀。我們只需要實現inlets,outlets和deepCopy這三個函式。
GraphStage描述了資料流構件的行為,通過資料流元素在構件中進出流動方式和在流動過程中的轉變來定義流構件的具體功能。下面是GraphStage的型別定義:

/**
 * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes
 * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing
 * logic that ties the ports together.
 */
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {
  final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =
    (createLogic(inheritedAttributes), NotUsed)
  @throws(classOf[Exception])
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic
}

每個構件都需要根據需求通過實現createLogic來設計GraphStageLogic功能。GraphStageLogic定義如下:
/**
 * Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a
 * collection of the following parts:
 *  * A set of [[InHandler]] and [[OutHandler]] instances and their assignments to the [[Inlet]]s and [[Outlet]]s
 *    of the enclosing [[GraphStage]]
 *  * Possible mutable state, accessible from the [[InHandler]] and [[OutHandler]] callbacks, but not from anywhere
 *    else (as such access would not be thread-safe)
 *  * The lifecycle hooks [[preStart()]] and [[postStop()]]
 *  * Methods for performing stream processing actions, like pulling or pushing elements
 *
 * The stage logic is completed once all its input and output ports have been closed. This can be changed by
 * setting `setKeepGoing` to true.
 *
 * The `postStop` lifecycle hook on the logic itself is called once all ports are closed. This is the only tear down
 * callback that is guaranteed to happen, if the actor system or the materializer is terminated the handlers may never
 * see any callbacks to `onUpstreamFailure`, `onUpstreamFinish` or `onDownstreamFinish`. Therefore stage resource
 * cleanup should always be done in `postStop`.
 */
abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) {...}

GraphStageLogic主要負責通過InHandler和OutHandler響應輸出輸入埠的事件,對元素的轉變和在埠上的流動方式進行控制:

/**
 * Collection of callbacks for an input port of a [[GraphStage]]
 */
trait InHandler {
  /**
   * Called when the input port has a new element available. The actual element can be retrieved via the
   * [[GraphStageLogic.grab()]] method.
   */
  @throws(classOf[Exception])
  def onPush(): Unit
 
  /**
   * Called when the input port is finished. After this callback no other callbacks will be called for this port.
   */
  @throws(classOf[Exception])
  def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage()
 
  /**
   * Called when the input port has failed. After this callback no other callbacks will be called for this port.
   */
  @throws(classOf[Exception])
  def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex)
}
 
/**
 * Collection of callbacks for an output port of a [[GraphStage]]
 */
trait OutHandler {
  /**
   * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
   * is now allowed to be called on this port.
   */
  @throws(classOf[Exception])
  def onPull(): Unit
 
  /**
   * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
   * be called for this port.
   */
  @throws(classOf[Exception])
  def onDownstreamFinish(): Unit = {
    GraphInterpreter
      .currentInterpreter
      .activeStage
      .completeStage()
  }
}

可以看到:我們需要實現InHandler.onPush()和OutHandler.onPull。akka-stream在資料流的各環節都實現了Reactive-Stream-Specification,所以對於輸入埠InHandler來講需要響應上游推送訊號onPush,輸出埠OutHandler要響應下游的讀取訊號onPull。就構件自身來說需要:從輸入埠pull(in),對輸出埠push(out)。

下面我們就示範設計一個迴圈產生一串指定字元的Source。Source只有一個輸出埠,我們只需要觀察輸出埠下游的讀取訊號。所以在這種情況下我們只需要重寫函式OutHandler即可:

class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] {
  val outport = Outlet[String]("output")
  val shape = SourceShape(outport)
 
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var pos: Int = 0
      setHandler(outport,new OutHandler {
        override def onPull(): Unit = {
          push(outport,chars(pos))
          pos += 1
          if (pos == chars.length) pos = 0
        }
      })
    }
}


GraphStage類是Graph子類:

abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {...}
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {...}
所以我們可以把AlphaSource當作Graph然後用Source.fromGraph來構建Source構件:

  val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("A","B","C","D"))
  val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure)
  alphaSource.runWith(Sink.foreach(println))

同樣對於Sink:我們只需要觀察上游推送訊號然後讀取資料:
class UppercaseSink extends GraphStage[SinkShape[String]] {
  val inport = Inlet[String]("input")
  val shape = SinkShape(inport)
 
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
 
      override def preStart(): Unit = pull(inport)
 
      override def onPush(): Unit = {
        println(grab(inport).toUpperCase)
        pull(inport)
      }
 
      setHandler(inport,this)
 
    }
}

從上面的AlphaSource,UppercaseSink我們略為嘗試了一把資料流元素流動控制,主要是對輸出輸入埠狀態變化採取一種被動的響應:通過push,pull來對埠進行操作。下面列出了一些常用的埠狀態事件及操作方法:

輸出埠狀態變化事件是通過OutHandler中的回撥函式(callback)來捕獲的。用setHandler(out,outHandler)來註冊OutHandler例項。下面是針對輸出埠的操作函式:

1、push(out,elem):對埠推出資料,只容許在下游使用pull提出讀取資料要求後才能進行,在此之前不容許多次呼叫

2、complete(out):正常手動關閉埠

3、fail(out,exeption):異常手動關閉埠

輸出埠響應事件包括:

1、onPull():下游可以接收資料,此時可以用push(out,elem)來向輸出埠傳送資料

2、onDownStreamFinish():下游終止讀取資料,此後不會再收到任何onPull事件

下面的函式可以獲得輸出埠的當前狀態:

1、isAvailable(out):true代表可以使用push(out,elem)

2、isClosed(out):true代表輸出埠已經關閉,無法聆聽事件或者推送資料

同樣,輸入埠狀態捕獲是通過用setHandler(in,inHandler)登記的inHandler中callback實現的。輸入埠操作函式包括:

1、pull(in):向上遊提出讀取資料要求,只容許在上游已經完成了資料推送後才能使用,在此之前不容許多次呼叫

2、grab(in):從埠讀取當前資料,只有在上游完成了資料推送後才能使用,其中不容許多次呼叫

3、cancel(in):手動關閉輸入埠

輸入埠事件:

1、onPush():上游已經發送資料至輸入埠,此時可以用grab(in)來讀取當前資料,用pull(in)向上遊要求下一個資料

2、onUpstreamFinish():上游已經終止資料傳送,此後再不會捕獲onPush事件,不得使用pull(in)向上遊請求資料

3、onUpstreamFalure():上游異常終止

獲取輸入埠狀態方法:

1、isAvailable(in):true代表現在可以使用grab(in)讀取當前資料

2、hasBeenPulled(in):true代表已經使用pull(in)進行了資料讀取要求,在此狀態下不容許再次使用pull(in)

3、isClosed(in):true代表埠已經關閉,此後不可施用pull(in)及無法捕獲onPush事件

從上面的pull(in)和push(out,elem)的功能描述可以得出它們是嚴格相互依賴、相互迴圈配合的,即:下游pull(in)前上游必須先push(out),而上游push(out,elem)前下游必須先pull(in)。這容易理解,因為akka-stream是Reactive-Stream,是push,pull結合模式上下游相互溝通的。但如此則很不方便某些應用場景,比如資料流動控制。akka-stream還提供了一套更簡單的API使使用者可以更靈活的對埠進行操作。這個API中的函式包括下面這些:

1、emit(out,elem):臨時替換OutHandler,向埠傳送elem,然後再恢復OutHandler

2、emitMultiple(out,Iterable(e1,e2,e3...)):臨時替換OutHandler,向埠傳送一串資料,然後再恢復OutHandler

3、read(in)(andThen):臨時替換InHandler,從埠讀取一個數據元素,然後再恢復InHandler

4、readN(in)(andThen):臨時替換InHandler,從埠讀取n個數據元素,然後再恢復InHandler

5、abortEmitting():取消輸出埠上未完成的資料推送

6、abortReading():取消輸入埠上未完成的讀取操作

這個API實際上也支援reactive-stream-backpressure,我們從emitMultiple函式原始碼中可以得出:

 /**
   * Emit a sequence of elements through the given outlet and continue with the given thunk
   * afterwards, suspending execution if necessary.
   * This action replaces the [[OutHandler]] for the given outlet if suspension
   * is needed and reinstalls the current handler upon receiving an `onPull()`
   * signal (before invoking the `andThen` function).
   */
  final protected def emitMultiple[T](out: Outlet[T], elems: Iterator[T], andThen: () ⇒ Unit): Unit =
    if (elems.hasNext) {
      if (isAvailable(out)) {
        push(out, elems.next())
        if (elems.hasNext)
          setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen))
        else andThen()
      } else {
        setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen))
      }
    } else andThen()

下面我們就定製一個Flow GraphStage,利用read/emit讓使用者自定義的函式可以控制資料流元素的流動和篩選。對於Flow,同時需要關注輸入埠上游推送資料狀態及輸出埠上下游讀取請求狀態:

trait Row
trait Move
case object Stand extends Move
case class Next(rows: Iterable[Row]) extends Move
 
class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] {
  val inport = Inlet[Row]("input")
  val outport = Outlet[Row]("output")
  val shape = FlowShape.of(inport,outport)
 
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      override def onPush(): Unit = {
        controller(grab(inport)) match {
          case Next(rows) => emitMultiple(outport,rows)
          case _ => pull(inport)
        }
      }
      override def onPull(): Unit = pull(inport)
      setHandlers(inport,outport,this)
    }
}

上面這個FlowValve型別是專門為施用一個使用者自定義函式controller而設的。controller函式根據上游推送的資料元素內容來決定Stand越過當前資料元素或者Next(...)向下遊傳送一或多個元素。當下遊可以接受資料發出pull請求時FlowValve會把它直接傳遞給上游。下面是使用者自定義函式的一個例子:
  case class Order(burger: String, qty: Int) extends Row
  case class Burger(msg: String) extends Row
 
  def orderDeliver: Row => Move = order => {
    order match {
      case Order(name,qty) =>
 
        if (qty > 0) {
          val burgers: Iterable[Burger] =
            (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) =>
              b ++ Iterable(Burger(s"$name $a of ${qty}"))
            }
          Next(burgers)
        } else Stand
    }
  }
 
 
  val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver)
  val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph)
  val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0)
                    ,Order("plain",1),Order("beef",2))
 
  Source(orders).via(deliverFlow).to(Sink.foreach(println)).run()

試運算結果顯示如下:
Burger(cheeze 1 of 2)
Burger(cheeze 2 of 2)
Burger(beef 1 of 3)
Burger(beef 2 of 3)
Burger(beef 3 of 3)
Burger(pepper 1 of 1)
Burger(plain 1 of 1)
Burger(beef 1 of 2)
Burger(beef 2 of 2)

正是我們預料的結果。對於一對多擴散型和多對一合併型形狀的資料流構件akka-stream提供了UniformFanIn和UniformFanOut兩種GraphStage。把這兩個結合起來使用可以構建多對多形狀的構件,所以預設定的GraphStage已經夠用。

下面是本次示範涉及的原始碼:

import akka.NotUsed
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream._
import scala.concurrent.duration._
import scala.collection.immutable.Iterable
 
class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] {
  val outport = Outlet[String]("output")
  val shape = SourceShape(outport)
 
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var pos: Int = 0
      setHandler(outport,new OutHandler {
        override def onPull(): Unit = {
          push(outport,chars(pos))
          pos += 1
          if (pos == chars.length) pos = 0
        }
      })
    }
}
class UppercaseSink extends GraphStage[SinkShape[String]] {
  val inport = Inlet[String]("input")
  val shape = SinkShape(inport)
 
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
 
      override def preStart(): Unit = pull(inport)
 
      override def onPush(): Unit = {
        println(grab(inport).toUpperCase)
        pull(inport)
      }
 
      setHandler(inport,this)
 
    }
}
 
trait Row
trait Move
case object Stand extends Move
case class Next(rows: Iterable[Row]) extends Move
 
class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] {
  val inport = Inlet[Row]("input")
  val outport = Outlet[Row]("output")
  val shape = FlowShape.of(inport,outport)
 
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      override def onPush(): Unit = {
        controller(grab(inport)) match {
          case Next(rows) => emitMultiple(outport,rows)
          case _ => pull(inport)
        }
      }
      override def onPull(): Unit = pull(inport)
      setHandlers(inport,outport,this)
    }
}
 
 
object GraphStages extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )
 
  val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("a","b","c","d"))
  val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure)
  // alphaSource.runWith(Sink.foreach(println))
 
  val sinkGraph: Graph[SinkShape[String],NotUsed] = new UppercaseSink
  val upperSink = Sink.fromGraph(sinkGraph)
  alphaSource.runWith(upperSink)
 
  case class Order(burger: String, qty: Int) extends Row
  case class Burger(msg: String) extends Row
 
  def orderDeliver: Row => Move = order => {
    order match {
      case Order(name,qty) =>
 
        if (qty > 0) {
          val burgers: Iterable[Burger] =
            (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) =>
              b ++ Iterable(Burger(s"$name $a of ${qty}"))
            }
          Next(burgers)
        } else Stand
    }
  }
 
 
  val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver)
  val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph)
  val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0)
                    ,Order("plain",1),Order("beef",2))
 
  Source(orders).via(deliverFlow).to(Sink.foreach(println)).run()
 
 
  // Source(1 to 10).runWith(Sink.foreach(println))
 
    scala.io.StdIn.readLine()
  sys.terminate()
 
}
--------------------- 
作者:TIGER_XC 
來源:CSDN 
原文:https://blog.csdn.net/TIGER_XC/article/details/77841179 
版權宣告:本文為博主原創文章,轉載請附上博文連結!