1. 程式人生 > >SDP(13): Scala.Future - far from completion,絕不能用來做甩手掌櫃

SDP(13): Scala.Future - far from completion,絕不能用來做甩手掌櫃

atm 軟件 listen async execute gpo eve 並且 pri

在前面幾篇關於數據庫引擎的討論裏很多的運算函數都返回了scala.Future類型的結果,因為我以為這樣就可以很方便的實現了non-blocking效果。無論任何復雜的數據處理操作,只要把它們包在一個Future{...}裏扔給系統運算就算完事不理了,馬上可以把關註放到編程的其它部分了。在3月17日的深圳scala用戶meetup裏我做了個關於scala函數式編程的分享,裏面我提到現在使用最多的函數組件就是scala.Future了。我想這應該在scala用戶群裏是個比較普遍的現象:大家都認為這是實現non-blocking最直接的一種方式。不過當我在meetup後回想到scala.Future時突然意識到它是一種即時運算值strict-value,看看下面這個例子:

  import scala.concurrent.duration._
  val fs = Future {println("run now..."); System.currentTimeMillis() }
                                         //> run now...
                                         //| fs  : scala.concurrent.Future[Long] = List()
  Await.result(fs, 1.second)             //> res0: Long = 1465907784714
Thread.sleep(1000) Await.result(fs, 1.second) //> res1: Long = 1465907784714

可以看到fs是在Future構建時即時運算的,而且只會運算一次。如果scala Future中包括了能產生副作用的代碼,在構建時就會立即產生副作用。所以我們是無法使用scala Future來編寫純函數的,如下:

val progA:Future[A] = for {
    b <- readFromB
    _ <- writeToLocationA(a)
    r <- getResult
} 
yield r /* location A content updated */ ... /* later */ val progB: Future[B] = for { a <- readFromA _ <- updateLocationA c <- getResult } ... val program: Future[Unit] = for { _ <- progA _ <- progB } yield()

在上面這個例子裏最終的目的是運算program:由progA,progB兩個子程序組成。這兩個子程序在構建的時候已經開始了運算,隨時都會更新localionA產生副作用。想象一下如果progA,progB是埋藏在其它一大堆源代碼裏的話program的運算結果肯定是無法預測的。換言之用Future來進行函數式組合就是在給自己挖坑嘛,最起碼要記住這些Future的構建順序,而這個要求在大型的協作開發軟件工程裏基本上是不可能的事。除了無法安全進行函數組合外scala.Future還缺少運算和線程控制的功能,比如:

無法控制什麽時候開始運算

無法控制在在哪個線程運算

無法終止開始運算的程序

缺少有效的異常處理機制如fallback,retry等

scalaz和monix函數組件庫裏都提供了Task來輔助Future實現函數組合。scalaz.Task是基於scalaz.Future的:

sealed abstract class Future[+A] {
...
object Future {
  case class Now[+A](a: A) extends Future[A]
  case class Async[+A](onFinish: (A => Trampoline[Unit]) => Unit) extends Future[A]
  case class Suspend[+A](thunk: () => Future[A]) extends Future[A]
  case class BindSuspend[A,B](thunk: () => Future[A], f: A => Future[B]) extends Future[B]
  case class BindAsync[A,B](onFinish: (A => Trampoline[Unit]) => Unit,
                            f: A => Future[B]) extends Future[B]
...

scalaz.Future[A]明顯就是個Free Monad。它的結構化表達方式分別有Now,Async,Suspend,BindSuspend,BindAsync。我們可以用這些結構實現flatMap函數,所以Future就是Free Monad:

def flatMap[B](f: A => Future[B]): Future[B] = this match {
    case Now(a) => Suspend(() => f(a))
    case Suspend(thunk) => BindSuspend(thunk, f)
    case Async(listen) => BindAsync(listen, f)
    case BindSuspend(thunk, g) =>
      Suspend(() => BindSuspend(thunk, g andThen (_ flatMap f)))
    case BindAsync(listen, g) =>
      Suspend(() => BindAsync(listen, g andThen (_ flatMap f)))
  }

因為free structure類型支持算式/算法關註分離,我們可以用scalaz.Future來描述程序功能而不涉及正真運算。這樣,在上面那個例子裏如果progA,progB是Task類型的,那麽program的構建就是安全的,因為我們最後是用Task.run來真正進行運算產生副作用的。scalaz.Task又在scalaz.Future功能基礎上再增加了異常處理等功能。

monix.Task采取了延遲運算的方式來實現算式/算法分離,下面是這個類型的基礎構建結構:

  /** [[Task]] state describing an immediate synchronous value. */
  private[eval] final case class Now[A](value: A) extends Task[A] {...}
  /** [[Task]] state describing an immediate synchronous value. */
  private[eval] final case class Eval[A](thunk: () => A)
    extends Task[A]

  /** Internal state, the result of [[Task.defer]] */
  private[eval] final case class Suspend[+A](thunk: () => Task[A])
    extends Task[A]

  /** Internal [[Task]] state that is the result of applying `flatMap`. */
  private[eval] final case class FlatMap[A, B](source: Task[A], f: A => Task[B])
    extends Task[B]
 /** Internal [[Coeval]] state that is the result of applying `map`. */
  private[eval] final case class Map[S, +A](source: Task[S], f: S => A, index: Int)
    extends Task[A] with (S => Task[A]) {

    def apply(value: S): Task[A] =
      new Now(f(value))
    override def toString: String =
      super[Task].toString
  }

  /** Constructs a lazy [[Task]] instance whose result will
    * be computed asynchronously.
    *
    * Unsafe to build directly, only use if you know what you‘re doing.
    * For building `Async` instances safely, see [[create]].
    */
  private[eval] final case class Async[+A](register: (Context, Callback[A]) => Unit)
    extends Task[A]  

下面的例子裏示範了如果用這些結構來構件monix.Task:

object Task extends TaskInstancesLevel1 {
  /** Returns a new task that, when executed, will emit the result of
    * the given function, executed asynchronously.
    *
    * This operation is the equivalent of:
    * {{{
    *   Task.eval(f).executeAsync
    * }}}
    *
    * @param f is the callback to execute asynchronously
    */
  def apply[A](f: => A): Task[A] =
    eval(f).executeAsync

  /** Returns a `Task` that on execution is always successful, emitting
    * the given strict value.
    */
  def now[A](a: A): Task[A] =
    Task.Now(a)

  /** Lifts a value into the task context. Alias for [[now]]. */
  def pure[A](a: A): Task[A] = now(a)

  /** Returns a task that on execution is always finishing in error
    * emitting the specified exception.
    */
  def raiseError[A](ex: Throwable): Task[A] =
    Error(ex)

  /** Promote a non-strict value representing a Task to a Task of the
    * same type.
    */
  def defer[A](fa: => Task[A]): Task[A] =
    Suspend(fa _)
...}
    source match {
      case Task.Now(v) => F.pure(v)
      case Task.Error(e) => F.raiseError(e)
      case Task.Eval(thunk) => F.delay(thunk())
      case Task.Suspend(thunk) => F.suspend(to(thunk()))
      case other => suspend(other)(F)
    }

這個Suspend結構就是延遲運算的核心。monix.Task是一套新出現的解決方案,借鑒了許多scalaz.Task的概念和方法同時又加入了很多優化、附加的功能,並且github更新也很近期。使用monix.Task應該是一個正確的選擇。

首先我們必須解決scala.Future與monix.Task之間的轉換:

  import monix.eval.Task
  import monix.execution.Scheduler.Implicits.global
  
  final class FutureToTask[A](x: => Future[A]) {
    def asTask: Task[A] = Task.deferFuture[A(x)
  }

  final class TaskToFuture[A](x: => Task[A]) {
    def asFuture: Future[A] = x.runAsync
  }

下面是一個完整的Task用例:

import scala.concurrent._
import scala.util._
import scala.concurrent.duration._
import monix.eval.Task
import monix.execution._
object MonixTask extends App {
import monix.execution.Scheduler.Implicits.global

  // Executing a sum, which (due to the semantics of apply)
  // will happen on another thread. Nothing happens on building
  // this instance though, this expression is pure, being
  // just a spec! Task by default has lazy behavior ;-)
  val task = Task { 1 + 1 }

  // Tasks get evaluated only on runAsync!
  // Callback style:
  val cancelable = task.runOnComplete {
      case Success(value) =>
        println(value)
      case Failure(ex) =>
        System.out.println(s"ERROR: ${ex.getMessage}")
  }
  //=> 2

  // If we change our mind...
  cancelable.cancel()

  // Or you can convert it into a Future
  val future: CancelableFuture[Int] =
    task.runAsync

  // Printing the result asynchronously
  future.foreach(println)
  //=> 2

  val task = Task.now { println("Effect"); "Hello!" }
  //=> Effect
  // task: monix.eval.Task[String] = Delay(Now(Hello!))
}

下面我們就看看各種Task的構建方法:

  /* ------ taskNow ----*/
  val taskNow = Task.now { println("Effect"); "Hello!" }
  //=> Effect
  // taskNow: monix.eval.Task[String] = Delay(Now(Hello!))

  /* --------taskDelay possible another on thread ------*/
  val taskDelay = Task { println("Effect"); "Hello!" }
  // taskDelay: monix.eval.Task[String] = Delay(Always(<function0>))

  taskDelay.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  // The evaluation (and thus all contained side effects)
  // gets triggered on each runAsync:
  taskDelay.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  /* --------taskOnce ------- */
  val taskOnce = Task.evalOnce { println("Effect"); "Hello!" }
  // taskOnce: monix.eval.Task[String] = EvalOnce(<function0>)

  taskOnce.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  // Result was memoized on the first run!
  taskOnce.runAsync.foreach(println)
  //=> Hello!

  /* --------taskFork ------- */
  // this guarantees that our task will get executed asynchronously:
  val task = Task(Task.eval("Hello!")).executeAsync
  //val task = Task.fork(Task.eval("Hello!"))

  // The default scheduler
  import monix.execution.Scheduler.Implicits.global

  // Creating a special scheduler meant for I/O
  import monix.execution.Scheduler
  lazy val io = Scheduler.io(name="my-io")
  //Then we can manage what executes on which:

  // Override the default Scheduler by fork:
  val source = Task(println(s"Running on thread: ${Thread.currentThread.getName}"))
  val forked = source.executeOn(io,true)
  // val forked = Task.fork(source, io)

  source.runAsync
  //=> Running on thread: ForkJoinPool-1-worker-1
  forked.runAsync
  //=> Running on thread: my-io-4

  /* --------taskError ------- */
  import scala.concurrent.TimeoutException

  val taskError = Task.raiseError[Int](new TimeoutException)
  // error: monix.eval.Task[Int] =
  //   Delay(Error(java.util.concurrent.TimeoutException))

  taskError.runOnComplete(result => println(result))
  //=> Failure(java.util.concurrent.TimeoutException)

下面是一些控制函數:

  final def doOnFinish(f: Option[Throwable] => Task[Unit]): Task[A] =
  final def doOnCancel(callback: Task[Unit]): Task[A] =
  final def onCancelRaiseError(e: Throwable): Task[A] =
  final def onErrorRecoverWith[B >: A](pf: PartialFunction[Throwable, Task[B]]): Task[B] =
  final def onErrorHandleWith[B >: A](f: Throwable => Task[B]): Task[B] =
  final def onErrorFallbackTo[B >: A](that: Task[B]): Task[B] =
  final def restartUntil(p: (A) => Boolean): Task[A] =
  final def onErrorRestart(maxRetries: Long): Task[A] =
  final def onErrorRestartIf(p: Throwable => Boolean): Task[A] =
  final def onErrorRestartLoop[S, B >: A](initial: S)(f: (Throwable, S, S => Task[B]) => Task[B]): Task[B] =
  final def onErrorHandle[U >: A](f: Throwable => U): Task[U] =
  final def onErrorRecover[U >: A](pf: PartialFunction[Throwable, U]): Task[U] =

Task是通過asyncRun和runSync來進行異步、同步實際運算的:

  def runAsync(implicit s: Scheduler): CancelableFuture[A] =
  def runAsync(cb: Callback[A])(implicit s: Scheduler): Cancelable =
  def runAsyncOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A] =
  def runAsyncOpt(cb: Callback[A])(implicit s: Scheduler, opts: Options): Cancelable =
  final def runSyncMaybe(implicit s: Scheduler): Either[CancelableFuture[A], A] =
  final def runSyncMaybeOpt(implicit s: Scheduler, opts: Options): Either[CancelableFuture[A], A] = 
  final def runSyncUnsafe(timeout: Duration)
    (implicit s: Scheduler, permit: CanBlock): A =
  final def runSyncUnsafeOpt(timeout: Duration)
    (implicit s: Scheduler, opts: Options, permit: CanBlock): A =
  final def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler): Cancelable =

下面示範了兩個通常的Task運算方法:

  val task1 = Task {println("sum:"); 1+2}.delayExecution(1 second)
  println(task1.runSyncUnsafe(2 seconds))
  
  task1.runOnComplete {
    case Success(r) => println(s"result: $r")
    case Failure(e) => println(e.getMessage)
  }

下面是本次示範的源代碼:

import scala.util._
import scala.concurrent.duration._
import monix.eval.Task
import monix.execution._
object MonixTask extends App {
import monix.execution.Scheduler.Implicits.global



  // Executing a sum, which (due to the semantics of apply)
  // will happen on another thread. Nothing happens on building
  // this instance though, this expression is pure, being
  // just a spec! Task by default has lazy behavior ;-)
  val task = Task { 1 + 1 }

  // Tasks get evaluated only on runAsync!
  // Callback style:
  val cancelable = task.runOnComplete {
      case Success(value) =>
        println(value)
      case Failure(ex) =>
        System.out.println(s"ERROR: ${ex.getMessage}")
  }
  //=> 2

  // If we change our mind...
  cancelable.cancel()

  // Or you can convert it into a Future
  val future: CancelableFuture[Int] =
    task.runAsync

  // Printing the result asynchronously
  future.foreach(println)
  //=> 2

  /* ------ taskNow ----*/
  val taskNow = Task.now { println("Effect"); "Hello!" }
  //=> Effect
  // taskNow: monix.eval.Task[String] = Delay(Now(Hello!))

  /* --------taskDelay possible another on thread ------*/
  val taskDelay = Task { println("Effect"); "Hello!" }
  // taskDelay: monix.eval.Task[String] = Delay(Always(<function0>))

  taskDelay.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  // The evaluation (and thus all contained side effects)
  // gets triggered on each runAsync:
  taskDelay.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  /* --------taskOnce ------- */
  val taskOnce = Task.evalOnce { println("Effect"); "Hello!" }
  // taskOnce: monix.eval.Task[String] = EvalOnce(<function0>)

  taskOnce.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  // Result was memoized on the first run!
  taskOnce.runAsync.foreach(println)
  //=> Hello!

  /* --------taskFork ------- */
  // this guarantees that our task will get executed asynchronously:
  val task = Task(Task.eval("Hello!")).executeAsync
  //val task = Task.fork(Task.eval("Hello!"))

  // The default scheduler
  import monix.execution.Scheduler.Implicits.global

  // Creating a special scheduler meant for I/O
  import monix.execution.Scheduler
  lazy val io = Scheduler.io(name="my-io")
  //Then we can manage what executes on which:

  // Override the default Scheduler by fork:
  val source = Task(println(s"Running on thread: ${Thread.currentThread.getName}"))
  val forked = source.executeOn(io,true)
  // val forked = Task.fork(source, io)

  source.runAsync
  //=> Running on thread: ForkJoinPool-1-worker-1
  forked.runAsync
  //=> Running on thread: my-io-4

  /* --------taskError ------- */
  import scala.concurrent.TimeoutException

  val taskError = Task.raiseError[Int](new TimeoutException)
  // error: monix.eval.Task[Int] =
  //   Delay(Error(java.util.concurrent.TimeoutException))

  taskError.runOnComplete(result => println(result))
  //=> Failure(java.util.concurrent.TimeoutException)

  

  val task1 = Task {println("sum:"); 1+2}.delayExecution(1 second)
  println(task1.runSyncUnsafe(2 seconds))

  task1.runOnComplete {
    case Success(r) => println(s"result: $r")
    case Failure(e) => println(e.getMessage)
  }

}

SDP(13): Scala.Future - far from completion,絕不能用來做甩手掌櫃