Akka(18): Stream:組合數據流,組件-Graph components
akka-stream的數據流可以由一些組件組合而成。這些組件統稱數據流圖Graph,它描述了數據流向和處理環節。Source,Flow,Sink是最基礎的Graph。用基礎Graph又可以組合更復雜的復合Graph。如果一個Graph的所有端口(輸入、輸出)都是連接的話就是一個閉合流圖RunnableGraph,否則就屬於·開放流圖PartialGraph。一個完整的(可運算的)數據流就是一個RunnableGraph。Graph的輸出出入端口可以用Shape來描述:
/**
* A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the
* philosophy that a Graph is a freely reusable blueprint, everything that
* matters from the outside are the connections that can be made with it,
* otherwise it is just a black box.
*/
abstract class Shape {
/**
* Scala API: get a list of all input ports
*/
def inlets: immutable.Seq[Inlet[_]]
/**
* Scala API: get a list of all output ports
*/
def outlets: immutable.Seq[Outlet[_]]
...
Shape類型的抽象函數inlets,outlets分別代表Graph形狀的輸入、輸出端口。下面列出了aka-stream提供的幾個現有形狀Shape:
final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {...}
final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape {...}
final case class SinkShape[-T](in: Inlet[T @uncheckedVariance]) extends Shape {...}
sealed abstract class ClosedShape extends Shape
/**
* A bidirectional flow of elements that consequently has two inputs and two
* outputs, arranged like this:
*
* {{{
* +------+
* In1 ~>| |~> Out1
* | bidi |
* Out2 <~| |<~ In2
* +------+
* }}}
*/
final case class BidiShape[-In1, +Out1, -In2, +Out2](
in1: Inlet[In1 @uncheckedVariance],
out1: Outlet[Out1 @uncheckedVariance],
in2: Inlet[In2 @uncheckedVariance],
out2: Outlet[Out2 @uncheckedVariance]) extends Shape {...}
object UniformFanInShape {
def apply[I, O](outlet: Outlet[O], inlets: Inlet[I]*): UniformFanInShape[I, O] =
new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList))
}
object UniformFanOutShape {
def apply[I, O](inlet: Inlet[I], outlets: Outlet[O]*): UniformFanOutShape[I, O] =
new UniformFanOutShape(outlets.size, FanOutShape.Ports(inlet, outlets.toList))
}
Shape是Graph類型的一個參數:
trait Graph[+S <: Shape, +M] {
/**
* Type-level accessor for the shape parameter of this graph.
*/
type Shape = S @uncheckedVariance
/**
* The shape of a graph is all that is externally visible: its inlets and outlets.
*/
def shape: S
...
RunnableGraph類型的Shape是ClosedShape:
/**
* Flow with attached input and output, can be executed.
*/
final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {
override def shape = ClosedShape
/**
* Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
*/
def mapMaterializedValue[Mat2](f: Mat ? Mat2): RunnableGraph[Mat2] =
copy(traversalBuilder.transformMat(f.asInstanceOf[Any ? Any]))
/**
* Run this flow and return the materialized instance from the flow.
*/
def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)
...
我們可以用akka-stream提供的GraphDSL來構建Graph。GraphDSL繼承了GraphApply的create方法,GraphDSL.create(...)就是構建Graph的方法:
object GraphDSL extends GraphApply {...}
trait GraphApply {
/**
* Creates a new [[Graph]] by passing a [[GraphDSL.Builder]] to the given create function.
*/
def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] ? S): Graph[S, NotUsed] = {
val builder = new GraphDSL.Builder
val s = buildBlock(builder)
createGraph(s, builder)
}
...
def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape) ? S): Graph[S, Mat] = {...}
def create[S <: Shape, Mat, M1, M2](g1: Graph[Shape, M1], g2: Graph[Shape, M2])(combineMat: (M1, M2) ? Mat)(buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape) ? S): Graph[S, Mat] = {...}
...
def create[S <: Shape, Mat, M1, M2, M3, M4, M5](g1: Graph[Shape, M1], g2: Graph[Shape, M2], g3: Graph[Shape, M3], g4: Graph[Shape, M4], g5: Graph[Shape, M5])(combineMat: (M1, M2, M3, M4, M5) ? Mat)(buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape, g3.Shape, g4.Shape, g5.Shape) ? S): Graph[S, Mat] = {
...}
buildBlock函數類型:buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape,...,g5.Shape) ? S,g?代表合並處理後的開放型流圖。下面是幾個最基本的Graph構建試例:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
object SimpleGraphs extends App{
implicit val sys = ActorSystem("streamSys")
implicit val ec = sys.dispatcher
implicit val mat = ActorMaterializer()
val source = Source(1 to 10)
val flow = Flow[Int].map(_ * 2)
val sink = Sink.foreach(println)
val sourceGraph = GraphDSL.create(){implicit builder =>
import GraphDSL.Implicits._
val src = source.filter(_ % 2 == 0)
val pipe = builder.add(Flow[Int])
src ~> pipe.in
SourceShape(pipe.out)
}
Source.fromGraph(sourceGraph).runWith(sink).andThen{case _ => } // sys.terminate()}
val flowGraph = GraphDSL.create(){implicit builder =>
import GraphDSL.Implicits._
val pipe = builder.add(Flow[Int])
FlowShape(pipe.in,pipe.out)
}
val (_,fut) = Flow.fromGraph(flowGraph).runWith(source,sink)
fut.andThen{case _ => } //sys.terminate()}
val sinkGraph = GraphDSL.create(){implicit builder =>
import GraphDSL.Implicits._
val pipe = builder.add(Flow[Int])
pipe.out.map(_ * 3) ~> Sink.foreach(println)
SinkShape(pipe.in)
}
val fut1 = Sink.fromGraph(sinkGraph).runWith(source)
Thread.sleep(1000)
sys.terminate()
上面我們示範了Source,Flow,Sink的Graph編寫,我們使用了Flow[Int]作為共同基礎組件。我們知道:akka-stream的Graph可以用更簡單的Partial-Graph來組合,而所有Graph最終都是用基礎流圖Core-Graph如Source,Flow,Sink組合而成的。上面例子裏我們是用builder.add(...)把一個Flow Graph加入到一個空的Graph模版裏,builder.add返回Shape pipe用於揭露這個被加入的Graph的輸入輸出端口。然後我們按目標Graph的功能要求把pipe的端口連接起來就完成了這個數據流圖的設計了。測試使用證明這幾個Graph的功能符合預想。下面我們還可以試著自定義一種類似的Pipe類型Graph來更細致的了解Graph組合的過程。所有基礎組件Core-Graph都必須定義Shape來描述它的輸入輸出端口,定義GraphStage中的GraphStateLogic來描述對數據流元素具體的讀寫方式。
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import scala.collection.immutable
case class PipeShape[In,Out](
in: Inlet[In],
out: Outlet[Out]) extends Shape {
override def inlets: immutable.Seq[Inlet[_]] = in :: Nil
override def outlets: immutable.Seq[Outlet[_]] = out :: Nil
override def deepCopy(): Shape =
PipeShape(
in = in.carbonCopy(),
out = out.carbonCopy()
)
}
PipeShape有一個輸入端口和一個輸出端口。因為繼承了Shape類所以必須實現Shape類的抽象函數。假設我們設計一個Graph,能把用戶提供的一個函數用來對輸入元素進行施用,如:source.via(ApplyPipe(myFunc)).runWith(sink)。當然,我們可以直接使用source.map(r => myFunc).runWith(sink),不過我們需要的是:ApplyPipe裏可能涉及到許多預設定的共用功能,然後myFunc是其中的一部分代碼。如果用map(...)的話用戶就必須提供所有的代碼了。ApplyPipe的形狀是PipeShape,下面是它的GraphState設計:
class Pipe[In, Out](f: In => Out) extends GraphStage[PipeShape[In, Out]] {
val in = Inlet[In]("Pipe.in")
val out = Outlet[Out]("Pipe.out")
override def shape = PipeShape(in, out)
override def initialAttributes: Attributes = Attributes.none
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private def decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
override def onPull(): Unit = pull(in)
override def onPush(): Unit = {
try {
push(out, f(grab(in)))
}
catch {
case NonFatal(ex) ? decider(ex) match {
case Supervision.Stop ? failStage(ex)
case _ ? pull(in)
}
}
}
setHandlers(in,out, this)
}
}
在這個Pipe GraphStage定義裏首先定義了輸入輸出端口in,out,然後通過createLogic來定義GraphStageLogic,InHandler,outHandler。InHandler和OutHandler分別對應輸入輸出端口上數據元素的活動處理方式:
/**
* Collection of callbacks for an input port of a [[GraphStage]]
*/
trait InHandler {
/**
* Called when the input port has a new element available. The actual element can be retrieved via the
* [[GraphStageLogic.grab()]] method.
*/
@throws(classOf[Exception])
def onPush(): Unit
/**
* Called when the input port is finished. After this callback no other callbacks will be called for this port.
*/
@throws(classOf[Exception])
def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage()
/**
* Called when the input port has failed. After this callback no other callbacks will be called for this port.
*/
@throws(classOf[Exception])
def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex)
}
/**
* Collection of callbacks for an output port of a [[GraphStage]]
*/
trait OutHandler {
/**
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
* is now allowed to be called on this port.
*/
@throws(classOf[Exception])
def onPull(): Unit
/**
* Called when the output port will no longer accept any new elements. After this callback no other callbacks will
* be called for this port.
*/
@throws(classOf[Exception])
def onDownstreamFinish(): Unit = {
GraphInterpreter
.currentInterpreter
.activeStage
.completeStage()
}
}
akka-stream Graph的輸入輸出處理實現了Reactive-Stream協議。所以我們最好使用akka-stream提供現成的pull,push來重寫抽象函數onPull,onPush。然後用setHandlers來設定這個GraphStage的輸入輸出及處理函數handler:
/**
* Assign callbacks for linear stage for both [[Inlet]] and [[Outlet]]
*/
final protected def setHandlers(in: Inlet[_], out: Outlet[_], handler: InHandler with OutHandler): Unit = {
setHandler(in, handler)
setHandler(out, handler)
}
/**
* Assigns callbacks for the events for an [[Inlet]]
*/
final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = {
handlers(in.id) = handler
if (_interpreter != null) _interpreter.setHandler(conn(in), handler)
}
/**
* Assigns callbacks for the events for an [[Outlet]]
*/
final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = {
handlers(out.id + inCount) = handler
if (_interpreter != null) _interpreter.setHandler(conn(out), handler)
}
有了Shape和GraphStage後我們就可以構建一個Graph:
def applyPipe[In,Out](f: In => Out) = GraphDSL.create() {implicit builder =>
val pipe = builder.add(new Pipe(f))
FlowShape(pipe.in,pipe.out)
}
也可以直接用來組合一個復合Graph:
RunnableGraph.fromGraph(
GraphDSL.create(){implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 10)
val sink = Sink.foreach(println)
val f: Int => Int = _ * 3
val pipeShape = builder.add(new Pipe[Int,Int](f))
source ~> pipeShape.in
pipeShape.out~> sink
ClosedShape
}
).run()
整個例子源代碼如下:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.ActorAttributes._
import akka.stream.stage._
import scala.collection.immutable
import scala.util.control.NonFatal
object PipeOps {
case class PipeShape[In, Out](
in: Inlet[In],
out: Outlet[Out]) extends Shape {
override def inlets: immutable.Seq[Inlet[_]] = in :: Nil
override def outlets: immutable.Seq[Outlet[_]] = out :: Nil
override def deepCopy(): Shape =
PipeShape(
in = in.carbonCopy(),
out = out.carbonCopy()
)
}
class Pipe[In, Out](f: In => Out) extends GraphStage[PipeShape[In, Out]] {
val in = Inlet[In]("Pipe.in")
val out = Outlet[Out]("Pipe.out")
override def shape = PipeShape(in, out)
override def initialAttributes: Attributes = Attributes.none
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private def decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
override def onPull(): Unit = pull(in)
override def onPush(): Unit = {
try {
push(out, f(grab(in)))
}
catch {
case NonFatal(ex) ? decider(ex) match {
case Supervision.Stop ? failStage(ex)
case _ ? pull(in)
}
}
}
setHandlers(in,out, this)
}
}
def applyPipe[In,Out](f: In => Out) = GraphDSL.create() {implicit builder =>
val pipe = builder.add(new Pipe(f))
FlowShape(pipe.in,pipe.out)
}
}
object ShapeDemo1 extends App {
import PipeOps._
implicit val sys = ActorSystem("streamSys")
implicit val ec = sys.dispatcher
implicit val mat = ActorMaterializer()
RunnableGraph.fromGraph(
GraphDSL.create(){implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 10)
val sink = Sink.foreach(println)
val f: Int => Int = _ * 3
val pipeShape = builder.add(new Pipe[Int,Int](f))
source ~> pipeShape.in
pipeShape.out~> sink
ClosedShape
}
).run()
val fut = Source(1 to 10).via(applyPipe[Int,Int](_ * 2)).runForeach(println)
scala.io.StdIn.readLine()
sys.terminate()
}
Akka(18): Stream:組合數據流,組件-Graph components