1. 程式人生 > >使用Scala實現一個併發庫(NonBlocking版本)


這個例子來源於scala聖經級教程《Functional Programming in Scala》,由於本人跟著書中的程式碼敲了一遍,然後寫了點測試程式碼驗證了一下正確性,所以就放在這做個備忘吧。貼出來只是為了方便自己,如果看不懂,但是又感興趣的就去看原書吧……

> 注:本文是上一篇文章《使用Scala實現一個併發庫(阻塞版本, 下一篇文章提供NonBlocking版本)》的延續

package parallelism

import java.util.concurrent._
import java.util.concurrent.atomic.AtomicReference

parallelism.Nonblocking.Future import language.implicitConversions object Nonblocking { trait Future[+A] { private[parallelism] def apply(k: A => Unit): Unit } type Par[+A] = ExecutorService => Future[A] object Par { def run[A](es: ExecutorService)(p: Par[A]): A = { // A mutable, threadsafe reference, to use for storing the result
val ref = new java.util.concurrent.atomic.AtomicReference[A] // A latch which, when decremented, implies that `ref` has the result val latch = new CountDownLatch(1) // Asynchronously set the result, and decrement the latch p(es) { a => ref.set(a); latch.countDown } // Block until the `latch.countDown` is invoked asynchronously
latch.await // Once we've passed the latch, we know `ref` has been set, and return its value ref.get } def unit[A](a: A): Par[A] = es => new Future[A] { def apply(cb: A => Unit): Unit = cb(a) } /** A non-strict version of `unit` */ def delay[A](a: => A): Par[A] = es => new Future[A] { def apply(cb: A => Unit): Unit = cb(a) } def fork[A](a: => Par[A]): Par[A] = es => new Future[A] { def apply(cb: A => Unit): Unit = eval(es)(a(es)(cb)) } /** * Helper function for constructing `Par` values out of calls to non-blocking continuation-passing-style APIs. * This will come in handy in Chapter 13. */ def async[A](f: (A => Unit) => Unit): Par[A] = es => new Future[A] { def apply(k: A => Unit) = f(k) } /** * Helper function, for evaluating an action * asynchronously, using the given `ExecutorService`. */ def eval(es: ExecutorService)(r: => Unit): Unit = es.submit(new Callable[Unit] { def call = r }) def map2[A, B, C](p: Par[A], p2: Par[B])(f: (A, B) => C): Par[C] = es => new Future[C] { def apply(cb: C => Unit): Unit = { var ar: Option[A] = None var br: Option[B] = None // this implementation is a little too liberal in forking of threads - // it forks a new logical thread for the actor and for stack-safety, // forks evaluation of the callback `cb` val combiner = Actor[Either[A, B]](es) { case Left(a) => if (br.isDefined) eval(es)(cb(f(a, br.get))) else ar = Some(a) case Right(b) => if (ar.isDefined) eval(es)(cb(f(ar.get, b))) else br = Some(b) } p(es)(a => combiner ! Left(a)) p2(es)(b => combiner ! Right(b)) } } // specialized version of `map` def map[A, B](p: Par[A])(f: A => B): Par[B] = es => new Future[B] { def apply(cb: B => Unit): Unit = p(es)(a => eval(es) { cb(f(a)) }) } def lazyUnit[A](a: => A): Par[A] = fork(unit(a)) def asyncF[A, B](f: A => B): A => Par[B] = a => lazyUnit(f(a)) def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = as match { case Nil => unit(Nil) case h :: t => map2(h, fork(sequence(t)))(_ :: _) } def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork { if (as.isEmpty) unit(Vector()) else if (as.length == 1) map(as.head)(a => Vector(a)) else { val (l, r) = as.splitAt(as.length / 2) map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _) } } def sequence[A](as: List[Par[A]]): Par[List[A]] = map(sequenceBalanced(as.toIndexedSeq))(_.toList) def parMap[A, B](as: List[A])(f: A => B): Par[List[B]] = sequence(as.map(asyncF(f))) def parMap[A, B](as: IndexedSeq[A])(f: A => B): Par[IndexedSeq[B]] = sequenceBalanced(as.map(asyncF(f))) // exercise answers /* * We can implement `choice` as a new primitive. * * `p(es)(result => ...)` for some `ExecutorService`, `es`, and * some `Par`, `p`, is the idiom for running `p`, and registering * a callback to be invoked when its result is available. The * result will be bound to `result` in the function passed to * `p(es)`. * * If you find this code difficult to follow, you may want to * write down the type of each subexpression and follow the types * through the implementation. What is the type of `p(es)`? What * about `t(es)`? What about `t(es)(cb)`? */ def choice[A](p: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] = es => new Future[A] { def apply(cb: A => Unit): Unit = p(es) { b => if (b) eval(es) { t(es)(cb) } else eval(es) { f(es)(cb) } } } /* The code here is very similar. */ def choiceN[A](p: Par[Int])(ps: List[Par[A]]): Par[A] = es => new Future[A] { def apply(cb: A => Unit): Unit = p(es) { ind => eval(es) { ps(ind)(es)(cb) } } } def choiceViaChoiceN[A](a: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] = choiceN(map(a)(b => if (b) 0 else 1))(List(ifTrue, ifFalse)) def choiceMap[K, V](p: Par[K])(ps: Map[K, Par[V]]): Par[V] = es => new Future[V] { def apply(cb: V => Unit): Unit = p(es)(k => ps(k)(es)(cb)) } /* `chooser` is usually called `flatMap` or `bind`. */ def chooser[A, B](p: Par[A])(f: A => Par[B]): Par[B] = flatMap(p)(f) def flatMap[A, B](p: Par[A])(f: A => Par[B]): Par[B] = es => new Future[B] { def apply(cb: B => Unit): Unit = p(es)(a => f(a)(es)(cb)) } def choiceViaFlatMap[A](p: Par[Boolean])(f: Par[A], t: Par[A]): Par[A] = flatMap(p)(b => if (b) t else f) def choiceNViaFlatMap[A](p: Par[Int])(choices: List[Par[A]]): Par[A] = flatMap(p)(i => choices(i)) def join[A](p: Par[Par[A]]): Par[A] = es => new Future[A] { def apply(cb: A => Unit): Unit = p(es)(p2 => eval(es) { p2(es)(cb) }) } def joinViaFlatMap[A](a: Par[Par[A]]): Par[A] = flatMap(a)(x => x) def flatMapViaJoin[A, B](p: Par[A])(f: A => Par[B]): Par[B] = join(map(p)(f)) /* Gives us infix syntax for `Par`. */ implicit def toParOps[A](p: Par[A]): ParOps[A] = new ParOps(p) // infix versions of `map`, `map2` and `flatMap` class ParOps[A](p: Par[A]) { def map[B](f: A => B): Par[B] = Par.map(p)(f) def map2[B, C](b: Par[B])(f: (A, B) => C): Par[C] = Par.map2(p, b)(f) def flatMap[B](f: A => Par[B]): Par[B] = Par.flatMap(p)(f) def zip[B](b: Par[B]): Par[(A, B)] = p.map2(b)((_, _)) } } def main(args: Array[String]): Unit = { // _print for dubug purpose val _print = (flag: Boolean) => if (flag) println(": " + Thread.currentThread()) else println(Thread.currentThread()) val es = Executors.newFixedThreadPool(3) val intDelayPar = Par.delay[Int]({ _print(false); math.pow(999, 67).toInt }); intDelayPar(es).apply((a: Int) => { _print(true); println(a) }) intDelayPar(es)((a: Int) => { _print(true); println(a) }) Par.fork(intDelayPar)(es)((a: Int) => { _print(true); println(a) }) Thread.sleep(6000L) println("**********************************************") Par.run(es)(intDelayPar) Thread.sleep(6000L) println("**********************************************") val _intDelayPar = Par.delay[Int]({ val future = es.submit(new Callable[Int] { _print(true); override def call = math.pow(999, 67).toInt }) val res = future.get println(res) res }); Par.run(es)(_intDelayPar) println("-------------------------------------------") import scala.language.implicitConversions import Par.toParOps // 通過隱式轉換將 map 方法變為中綴操作方式 (Par.delay(1) map {(a: Int) => 111 * a})(es)(println(_)) (Par.delay(2) map {(a: Int) => 111 * a})(es)(println(_)) (Par.delay(3) map {(a: Int) => 111 * a})(es)(println(_)) es.shutdown() } }


: Thread[main,5,main]
: Thread[main,5,main]
: Thread[pool-1-thread-1,5,main]
: Thread[main,5,main]


package parallelism

import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.{Callable,ExecutorService}
import annotation.tailrec

 * Implementation is taken from `scalaz` library, with only minor changes. See:
 * https://github.com/scalaz/scalaz/blob/scalaz-seven/concurrent/src/main/scala/scalaz/concurrent/Actor.scala
 * This code is copyright Andriy Plokhotnyuk, Runar Bjarnason, and other contributors,
 * and is licensed using 3-clause BSD, see LICENSE file at:
 * https://github.com/scalaz/scalaz/blob/scalaz-seven/etc/LICENCE

 * Processes messages of type `A`, one at a time. Messages are submitted to
 * the actor with the method `!`. Processing is typically performed asynchronously,
 * this is controlled by the provided `strategy`.
 * Memory consistency guarantee: when each message is processed by the `handler`, any memory that it
 * mutates is guaranteed to be visible by the `handler` when it processes the next message, even if
 * the `strategy` runs the invocations of `handler` on separate threads. This is achieved because
 * the `Actor` reads a volatile memory location before entering its event loop, and writes to the same
 * location before suspending.
 * Implementation based on non-intrusive MPSC node-based queue, described by Dmitriy Vyukov:
 * [[http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue]]
 * @see scalaz.concurrent.Promise for a use case.
 * @param handler  The message handler
 * @param onError  Exception handler, called if the message handler throws any `Throwable`.
 * @param strategy Execution strategy, for example, a strategy that is backed by an `ExecutorService`
 * @tparam A       The type of messages accepted by this actor.
final class Actor[A](strategy: Strategy)(handler: A => Unit, onError: Throwable => Unit = throw(_)) {
  self =>

  private val tail = new AtomicReference(new Node[A]())
  private val suspended = new AtomicInteger(1)
  private val head = new AtomicReference(tail.get)

  /** Alias for `apply` */
  def !(a: A) {
    val n = new Node(a)

  /** Pass the message `a` to the mailbox of this actor */
  def apply(a: A) {
    this ! a

  def contramap[B](f: B => A): Actor[B] =
    new Actor[B](strategy)((b: B) => (this ! f(b)), onError)

  private def trySchedule() {
    if (suspended.compareAndSet(1, 0)) schedule()

  private def schedule() {

  private def act() {
    val t = tail.get
    val n = batchHandle(t, 1024)
    if (n ne t) {
      n.a = null.asInstanceOf[A]
    } else {
      if (n.get ne null) trySchedule()

  private def batchHandle(t: Node[A], i: Int): Node[A] = {
    val n = t.get
    if (n ne null) {
      try {
      } catch {
        case ex: Throwable => onError(ex)
      if (i > 0) batchHandle(n, i - 1) else n
    } else t

private class Node[A](var a: A = null.asInstanceOf[A]) extends AtomicReference[Node[A]]

object Actor {

  /** Create an `Actor` backed by the given `ExecutorService`. */
  def apply[A](es: ExecutorService)(handler: A => Unit, onError: Throwable => Unit = throw(_)): Actor[A] =
    new Actor(Strategy.fromExecutorService(es))(handler, onError)

 * Provides a function for evaluating expressions, possibly asynchronously.
 * The `apply` function should typically begin evaluating its argument
 * immediately. The returned thunk can be used to block until the resulting `A`
 * is available.
trait Strategy {
  def apply[A](a: => A): () => A

object Strategy {

   * We can create a `Strategy` from any `ExecutorService`. It's a little more
   * convenient than submitting `Callable` objects directly.
  def fromExecutorService(es: ExecutorService): Strategy = new Strategy {
    def apply[A](a: => A): () => A = {
      val f = es.submit { new Callable[A] { def call = a} }
      () => f.get

   * A `Strategy` which begins executing its argument immediately in the calling thread.
  def sequential: Strategy = new Strategy {
    def apply[A](a: => A): () => A = {
      val r = a
      () => r
