1. 程式人生 > >Akka Stream之Graph

Akka Stream之Graph

最近在專案中需要實現圖的一些操作,因此,初步考慮使用Akka Stream的Graph實現。從而學習了下:

一、介紹

我們知道在Akka Stream中有三種簡單的線性資料流操作:Source/Flow/Sink。但是當我們需要使用一些複雜的操作,例如扇入和扇出時,可能就需要使用圖相關的流操作了。因此,我們可以這樣認為,Akka Stream的Graph是一種運算方案,他可能是簡單的線性資料流,也可以由基礎的流圖組合而成的複雜的資料流程。因為Graph只是對資料流運算的簡單描述,所以它是可以重複利用的。

二、依賴

要使用Akka Stream的Graph,我們需要新增下面的依賴:

<dependency>
  <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.12</artifactId> <version>2.5.18</version> </dependency>

三、構建Graph

Graph是由簡單的Flow組成的,這些Flow用作圖形中的線性連線以及用作Flow的扇入和扇出點的連線點。Akka Stream目前提供了下面這些連線點:

1、扇出:

(1)Broadcast[T]:(1輸入,N輸出)給定輸入元件發射到每個輸出

(2)Balance[T]:(1輸入,N輸出)給定輸入元件發射到其輸出埠之一

(3)UnzipWith[In,A,B,...]:(1個輸入,N個輸出)採用1個輸入的函式,給定每個輸入的值發出N個輸出元素(其中N <= 20)

(4)UnZip[A,B]:(1個輸入,2個輸出)將元組流(A,B)拆分為兩個流,一個是型別A,另一個是型別B

2、扇入:

(1)Merge[In]:(N個輸入,1個輸出)從輸入中隨機選取將它們逐個推入其輸出

(2)MergePreferred[In]Merge但是如果元素在最受歡迎的埠上可用,它會從中選擇,否則從中隨機從其他埠上選

(3)MergePrioritized[In]Merge但是如果元素在所有輸入埠上都可用,它會根據它們的優先順序隨機選擇它們

(4)MergeLatest[In]:(N個輸入,1個輸出)發出List[In],當第i個輸入流發出元素時,發出的列表中的第i個元素被更新

(5)ZipWith[A,B,...,Out]:(N個輸入,1個輸出),其取N個輸入的函式,給出每個輸入的值,發出1個輸出元素

(6)Zip[A,B]:(2個輸入,1個輸出)是一個ZipWith專用於壓縮和解的輸入流AB成元組流(A,B)

(7)Concat[A]:(2個輸入,1個輸出)連線兩個流(首先消耗一個,然後消耗第二個)

四、例子 

現在假設我們需要實現如下圖所示的一個Graph

我們可以用akka-stream提供的GraphDSL來構建Graph。GraphDSL繼承了GraphApply的create方法,GraphDSL.create(...)就是構建Graph的方法,因此,我們可以使用如下程式碼建立上圖所示的Graph:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import GraphDSL.Implicits._ val in = Source(1 to 10) val out = Sink.ignore val bcast = builder.add(Broadcast[Int](2)) val merge = builder.add(Merge[Int](2)) val f1, f2, f3, f4 = Flow[Int].map(_ + 10) in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out bcast ~> f4 ~> merge ClosedShape })

注意:在這個裡面我們需要引入import GraphDSL.Implicits._。是為了將~>(讀作邊緣,通過或者到),以及他的相反操作<~引入到程式碼的範圍內。