1. 程式人生 > >Akka(17): Stream:資料流基礎元件-Source,Flow,Sink簡介

Akka(17): Stream:資料流基礎元件-Source,Flow,Sink簡介

 在大資料程式流行的今天,許多程式都面臨著共同的難題:程式輸入資料趨於無限大,抵達時間又不確定。一般的解決方法是採用回撥函式(callback-function)來實現的,但這樣的解決方案很容易造成“回撥地獄(callback hell)”,即所謂的“goto-hell”:程式控制跳來跳去很難跟蹤,特別是一些變數如果在回撥函式中更改後產生不可預料的結果。資料流(stream)是一種解決問題的有效程式設計方式。Stream是一個抽象概念,能把程式資料輸入過程和其它細節隱蔽起來,通過申明方式把資料處理過程描述出來,使整體程式邏輯更容易理解跟蹤。當然,犧牲的是對一些運算細節的控制能力。我們在前面介紹過scalaz-stream,它與akka-stream的主要區別在於:

1、scalaz-stream是pull模式的,而akka-stream是push模式的。pull模式的缺點是接收資料效率問題,因為在這種模式里程序必須不斷重複檢測(polling)輸入埠是否有資料存在。而push模式則會把資料推到輸入埠後直接進入程式,但如果資料來源頭動作太快程式無法及時處理所有推送的資料時就會造成所謂的資料溢位問題,遺失資料。不過akka-stream實現了reactive-stream的back-pressure規範:資料傳送方和接收方之間互動提示,使過快的資料產生能按接收方要求慢下來甚至暫時停下來。

2、scalaz-sstream和akka-stream的資料流都是一種申明式的資料處理流程描述,屬於一種運算方案,最終都需要某種運算器來對資料流按運算方案進行具體的運算,得出運算結果和產生副作用。scalaz-stream的運算器是自備的函式式程式,特點是能很好的控制執行緒使用和進行並行運算。akka-stream的運算器是materializer。materializer在actor系統上執行,具備了actor模式程式的優點包括:訊息驅動、叢集運算、監管策略(SupervisorStrategy)等等。

akka-stream的資料流是由三類基礎元件組合而成,不同的組合方式代表不同的資料處理及表達功能。三類元件分別是:

1、Source:資料來源。akka-stream屬於push模式,所以Source也就是Publisher(資料釋出方),Source的形狀SourceShape代表只有一個輸出埠的形狀。Source可以從單值、集合、某種Publisher或另一個數據流產生資料流的元素(stream-element),包括:

  /**    * Helper to create [[Source]] from `Iterable`.    * Example usage: `Source(Seq(1,2,3))`    *    * Starts a new `Source` from the given `Iterable`. This is like starting from an    * Iterator, but every Subscriber directly attached to the Publisher of this    * stream will see an individual flow of elements (always starting from the    * beginning) regardless of when they subscribed.    */   def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =     single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)     /**    * Create a `Source` with one element.    * Every connected `Sink` of this stream will see an individual stream consisting of one element.    */   def single[T](element: T): Source[T, NotUsed] =     fromGraph(new GraphStages.SingleSource(element))     /**    * Helper to create [[Source]] from `Iterator`.    * Example usage: `Source.fromIterator(() => Iterator.from(0))`    *    * Start a new `Source` from the given function that produces anIterator.    * The produced stream of elements will continue until the iterator runs empty    * or fails during evaluation of the `next()` method.    * Elements are pulled out of the iterator in accordance with the demand coming    * from the downstream transformation steps.    */   def fromIterator[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] =     apply(new immutable.Iterable[T] {       override def iterator: Iterator[T] = f()       override def toString: String = "() => Iterator"     })     /**    * Starts a new `Source` from the given `Future`. The stream will consist of    * one element when the `Future` is completed with a successful value, which    * may happen before or after materializing the `Flow`.    * The stream terminates with a failure if the `Future` is completed with a failure.    */   def fromFuture[T](future: Future[T]): Source[T, NotUsed] =     fromGraph(new FutureSource(future))     /**    * Helper to create [[Source]] from `Publisher`.    *    * Construct a transformation starting with given publisher. The transformation steps    * are executed by a series of [[org.reactivestreams.Processor]] instances    * that mediate the flow of elements downstream and the propagation of    * back-pressure upstream.    */   def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =     fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))     /**    * A graph with the shape of a source logically is a source, this method makes    * it so also in type.    */   def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {     case s: Source[T, M]         ⇒ s     case s: javadsl.Source[T, M] ⇒ s.asScala     case other ⇒ new Source(       LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),       other.shape)   }

下面還有幾個特殊的Source:   /**    * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.    */   def empty[T]: Source[T, NotUsed] = _empty   private[this] val _empty: Source[Nothing, NotUsed] =     Source.fromGraph(EmptySource)     /**    * Create a `Source` that will continually emit the given element.    */   def repeat[T](element: T): Source[T, NotUsed] = {     val next = Some((element, element))     unfold(element)(_ ⇒ next).withAttributes(DefaultAttributes.repeat)   }     /**    * Creates [[Source]] that will continually produce given elements in specified order.    *    * Starts a new 'cycled' `Source` from the given elements. The producer stream of elements    * will continue infinitely by repeating the sequence of elements provided by function parameter.    */   def cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] = {     val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten     fromIterator(() ⇒ iterator).withAttributes(DefaultAttributes.cycledSource)   }

2、Sink:資料終端。屬於資料元素的使用方,主要作用是消耗資料流中的元素。SinkShape是有一個輸入端的資料流形狀。Sink消耗流元素的例子有:   /**    * A `Sink` that will consume the stream and discard the elements.    */   def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink)     /**    * A `Sink` that will invoke the given procedure for each received element. The sink is materialized    * into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the    * normal end of the stream, or completed with `Failure` if there is a failure signaled in    * the stream..    */   def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]] =     Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")

注意,akka-stream實際是在actor上進行運算的。actor的內部狀態最終可以形成運算結果。上面的例子可以得出Sink的運算結果是Future[??]型別的。 3、Flow:資料處理節點。對通過輸入埠輸入資料流的元素進行轉變處理(transform)後經過輸出埠輸出。FlowShape有一個輸入端和一個輸出端。

在akka-stream裡資料流元件一般被稱為資料流圖(graph)。我們可以用許多資料流圖組成更大的stream-graph。

akka-stream最簡單的完整(或者閉合)線性資料流(linear-stream)就是直接把一個Source和一個Sink相接。這種方式代表一種對資料流所有元素的直接表現,如:source.runWith(Sink.foreach(println))。我們可以用Source.via來連線Flow,用Source.to連線Sink:

  override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)     override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = {     val toAppend =       if (flow.traversalBuilder eq Flow.identityTraversalBuilder)         LinearTraversalBuilder.empty()       else         flow.traversalBuilder       new Source[T, Mat3](       traversalBuilder.append(toAppend, flow.shape, combine),       SourceShape(flow.shape.out))   }     /**    * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],    * concatenating the processing steps of both.    */   def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left)     /**    * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],    * concatenating the processing steps of both.    */   def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = {     RunnableGraph(traversalBuilder.append(sink.traversalBuilder, sink.shape, combine))   }

可以發現via和to分別是viaMat和toMat的簡寫,分別固定了Keep.left。意思是選擇左邊資料流圖的運算結果。我們上面提過akka-stream是在actor系統裡處理資料流元素的。在這個過程中同時可以用actor內部狀態來產生運算結果。via和to連線了左右兩個graph,並且選擇了左邊graph的運算結果。我們可以用viaMat和toMat來選擇右邊graph運算結果。這是通過combine: (Mat,Mat2)=>Mat3這個函式實現的。akka-stream提供了一個Keep物件來表達這種選擇: /**  * Convenience functions for often-encountered purposes like keeping only the  * left (first) or only the right (second) of two input values.  */ object Keep {   private val _left = (l: Any, r: Any) ⇒ l   private val _right = (l: Any, r: Any) ⇒ r   private val _both = (l: Any, r: Any) ⇒ (l, r)   private val _none = (l: Any, r: Any) ⇒ NotUsed     def left[L, R]: (L, R) ⇒ L = _left.asInstanceOf[(L, R) ⇒ L]   def right[L, R]: (L, R) ⇒ R = _right.asInstanceOf[(L, R) ⇒ R]   def both[L, R]: (L, R) ⇒ (L, R) = _both.asInstanceOf[(L, R) ⇒ (L, R)]   def none[L, R]: (L, R) ⇒ NotUsed = _none.asInstanceOf[(L, R) ⇒ NotUsed] }

既然提到運算結果的處理方式,我們就來看看Source,Flow,Sink的型別引數: Source[+Out, +Mat]       //Out代表元素型別,Mat為運算結果型別 Flow[-In, +Out, +Mat]    //In,Out為資料流元素型別,Mat是運算結果型別 Sink[-In, +Mat]          //In是資料元素型別,Mat是運算結果型別

Keep物件提供的是對Mat的選擇。上面原始碼中to,toMat函式的返回結果都是RunnableGraph[Mat3],也就是說只有連線了Sink的資料流才能進行運算。RunnableGraph提供一個run()函式來運算資料流:

/**  * Flow with attached input and output, can be executed.  */ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {   override def shape = ClosedShape     /**    * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.    */   def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): RunnableGraph[Mat2] =     copy(traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]))     /**    * Run this flow and return the materialized instance from the flow.    */   def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) ...

上面shape = ClosedShape代表RunnableGraph的形狀是閉合的(ClosedShape),意思是說:一個可執行的graph所有輸人輸出埠都必須是連線的。 下面我們就用一個最簡單的線性資料流來做些詳細解釋:

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka._ import scala.concurrent._   object SourceDemo extends App {   implicit val sys=ActorSystem("demo")   implicit val mat=ActorMaterializer()   implicit val ec=sys.dispatcher     val s1: Source[Int,NotUsed] = Source(1 to 10)   val sink: Sink[Any,Future[Done]] = Sink.foreach(println)   val rg1: RunnableGraph[NotUsed] = s1.to(sink)   val rg2: RunnableGraph[Future[Done]]  = s1.toMat(sink)(Keep.right)   val res1: NotUsed = rg1.run()     Thread.sleep(1000)     val res2: Future[Done] = rg2.run()   res2.andThen {     case _ => sys.terminate()   }    }

我們把焦點放在特別註明的結果型別上面:Source的運算結果Mat型別是NotUsed,Sink的運算結果Mat型別是Future[Done]。從上面這段程式碼我們看到用toMat選擇返回Sink的運算結果Future[Done]才能捕捉到運算終止節點。下面的另一個例子包括了一些組合動作:   val seq = Seq[Int](1,2,3)   def toIterator() = seq.iterator   val flow1: Flow[Int,Int,NotUsed] = Flow[Int].map(_ + 2)   val flow2: Flow[Int,Int,NotUsed] = Flow[Int].map(_ * 3)   val s2 = Source.fromIterator(toIterator)   val s3 = s1 ++ s2     val s4: Source[Int,NotUsed] = s3.viaMat(flow1)(Keep.right)   val s5: Source[Int,NotUsed] = s3.via(flow1).async.viaMat(flow2)(Keep.right)   val s6: Source[Int,NotUsed] = s4.async.viaMat(flow2)(Keep.right)   (s5.toMat(sink)(Keep.right).run()).andThen {case _ => sys.terminate()}

一般來講,資料流元素的所有處理過程都合併在一個actor上進行(steps-fusing),這樣可以免去actor之間的訊息傳遞,但同時也會限制資料元素的並行處理。aync的作用是指定左邊的graph在一個獨立的actor上執行。注意:s6=s5。 從上面例子裡的組合結果型別我們發現:把一個Flow連線到一個Source上形成了一個新的Source。

實際上我們可以用akka-stream Source提供的方法糖來直接運算資料流,如下:

  s1.runForeach(println)   val fres = s6.runFold(0)(_ + _)   fres.onSuccess{case a => println(a)}   fres.andThen{case _ => sys.terminate()}

下面是Source中的一些runner:

 /**    * Shortcut for running this `Source` with a fold function.    * The given function is invoked for every received element, giving it its previous    * output (or the given `zero` value) and the element as input.    * The returned [[scala.concurrent.Future]] will be completed with value of the final    * function evaluation when the input stream ends, or completed with `Failure`    * if there is a failure signaled in the stream.    */   def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: Materializer): Future[U] = runWith(Sink.fold(zero)(f))     /**    * Shortcut for running this `Source` with a foldAsync function.    * The given function is invoked for every received element, giving it its previous    * output (or the given `zero` value) and the element as input.    * The returned [[scala.concurrent.Future]] will be completed with value of the final    * function evaluation when the input stream ends, or completed with `Failure`    * if there is a failure signaled in the stream.    */   def runFoldAsync[U](zero: U)(f: (U, Out) ⇒ Future[U])(implicit materializer: Materializer): Future[U] = runWith(Sink.foldAsync(zero)(f))     /**    * Shortcut for running this `Source` with a reduce function.    * The given function is invoked for every received element, giving it its previous    * output (from the second element) and the element as input.    * The returned [[scala.concurrent.Future]] will be completed with value of the final    * function evaluation when the input stream ends, or completed with `Failure`    * if there is a failure signaled in the stream.    *    * If the stream is empty (i.e. completes before signalling any elements),    * the reduce stage will fail its downstream with a [[NoSuchElementException]],    * which is semantically in-line with that Scala's standard library collections    * do in such situations.    */   def runReduce[U >: Out](f: (U, U) ⇒ U)(implicit materializer: Materializer): Future[U] =     runWith(Sink.reduce(f))     /**    * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked    * for each received element.    * The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the    * normal end of the stream, or completed with `Failure` if there is a failure signaled in    * the stream.    */   // FIXME: Out => Unit should stay, right??   def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Done] = runWith(Sink.foreach(f))

它們的功能都是通過runWith實現的:  /**    * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value    * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].    */   def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = toMat(sink)(Keep.right).run()

實際上是使用了Sink類裡的對應方法Sink.???。 下面是本次的示範原始碼:

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka._ import scala.concurrent._   object SourceDemo extends App {   implicit val sys=ActorSystem("demo")   implicit val mat=ActorMaterializer()   implicit val ec=sys.dispatcher     val s1: Source[Int,NotUsed] = Source(1 to 10)   val sink: Sink[Any,Future[Done]] = Sink.foreach(println)   val rg1: RunnableGraph[NotUsed] = s1.to(sink)   val rg2: RunnableGraph[Future[Done]]  = s1.toMat(sink)(Keep.right)   val res1: NotUsed = rg1.run()     Thread.sleep(1000)     val res2: Future[Done] = rg2.run()   res2.andThen {     case _ =>   //sys.terminate()   }     val seq = Seq[Int](1,2,3)   def toIterator() = seq.iterator   val flow1: Flow[Int,Int,NotUsed] = Flow[Int].map(_ + 2)   val flow2: Flow[Int,Int,NotUsed] = Flow[Int].map(_ * 3)   val s2 = Source.fromIterator(toIterator)   val s3 = s1 ++ s2     val s4: Source[Int,NotUsed] = s3.viaMat(flow1)(Keep.right)   val s5: Source[Int,NotUsed] = s3.via(flow1).async.viaMat(flow2)(Keep.right)   val s6: Source[Int,NotUsed] = s4.async.viaMat(flow2)(Keep.right)   (s5.toMat(sink)(Keep.right).run()).andThen {case _ => } //sys.terminate()}     s1.runForeach(println)   val fres = s6.runFold(0)(_ + _)   fres.onSuccess{case a => println(a)}   fres.andThen{case _ => sys.terminate()}    } ---------------------  作者:TIGER_XC  來源:CSDN  原文:https://blog.csdn.net/TIGER_XC/article/details/77159545  版權宣告:本文為博主原創文章,轉載請附上博文連結!