1. 程式人生 > >使用Scala實現一個併發庫(阻塞版本, 下一篇文章提供NonBlocking版本)

使用Scala實現一個併發庫(阻塞版本, 下一篇文章提供NonBlocking版本)

這個例子來源於scala聖經級教程《Functional Programming in Scala》,由於本人跟著書中的程式碼敲了一遍,然後寫了點測試程式碼驗證了一下正確性,所以就放在這做個備忘吧。貼出來只是為了方便自己,如果看不懂,但是又感興趣的就去看原書吧……

> 注:這個併發庫使用的執行緒池如果只有唯一一條工作執行緒的話,會導致執行緒阻塞,可以參考main方法中的示例,阻塞原因與程式碼中fork的實現細節有關,所以,本人會在下一篇文章提供一個非阻塞的版本…..

package parallelism

import java.util.concurrent._

object Par {

  type Par[A] = ExecutorService => Future[A]

  def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

  /**
    * `unit` is represented as a function that returns a `UnitFuture`,
    * which is a simple implementation of `Future` that just wraps a constant value.
    * It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled.
    * Its `get` method simply returns the value that we gave it.
    */
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) private case class UnitFuture[A](get: A) extends Future[A] { override def cancel(mayInterruptIfRunning: Boolean) = false override def isCancelled = false override def isDone = true override def get(timeout: Long, unit: TimeUnit) = get } /** * `map2` doesn't evaluate the call to `f` in a separate logical thread, * in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. * We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread. */
def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] = (es: ExecutorService) => { val af = a(es) val bf = b(es) // This implementation of `map2` does _not_ respect timeouts. // It simply passes the `ExecutorService` on to both `Par` values, // waits for the results of the Futures `af` and `bf`,
// applies `f` to them, and wraps them in a `UnitFuture`. // In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, // then subtracts that time from the available time allocated for evaluating `bf`. UnitFuture(f(af.get, bf.get)) } /** * This is the simplest and most natural implementation of `fork`, * but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. * Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, * this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. * This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter. */ def fork[A](a: => Par[A]): Par[A] = es => es.submit( new Callable[A] { override def call() = a(es).get }) def lazyUnit[A](a: => A): Par[A] = fork(unit(a)) def asyncF[A, B](f: A => B): A => Par[B] = a => lazyUnit(f(a)) def map[A, B](pa: Par[A])(f: A => B): Par[B] = map2(pa, unit(()))((a, _) => f(a)) def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted) def sequence_simple[A](l: List[Par[A]]): Par[List[A]] = l.foldRight[Par[List[A]]](unit(List[A]()))((h, t) => map2(h, t)(_ :: _)) /** * This implementation forks the recursive step off to a new logical thread, * making it effectively tail-recursive. However, we are constructing * a right-nested parallel program, and we can get better performance by * dividing the list in half, and running both halves in parallel. * See `sequenceBalanced` below.*/ def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = as match { case Nil => unit(Nil) case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _) } /** * We define `sequenceBalanced` using `IndexedSeq`, which provides an * efficient function for splitting the sequence in half.*/ def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork { if (as.isEmpty) unit(Vector()) else if (as.length == 1) map(as.head)(a => Vector(a)) else { val (l, r) = as.splitAt(as.length / 2) map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _) } } def sequence[A](as: List[Par[A]]): Par[List[A]] = map(sequenceBalanced(as.toIndexedSeq))(_.toList) def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]] = { val pars: List[Par[List[A]]] = l map (asyncF((a: A) => if (f(a)) List(a) else List())) map(sequence(pars))(_.flatten) } def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean = p(e).get == p2(e).get def delay[A](fa: => Par[A]): Par[A] = es => fa(es) // Notice we are blocking on the result of `cond`. def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] = es => if (run(es)(cond).get) t(es) else f(es) def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A] = es => { // Full source files val ind = run(es)(n).get run(es)(choices(ind)) } def choiceViaChoiceN[A](a: Par[Boolean])(ifTrue: Par[A])(ifFalse: Par[A]): Par[A] = choiceN(map(a)(b => if (b) 0 else 1))(List(ifTrue, ifFalse)) def choiceMap[K, V](key: Par[K])(choices: Map[K, Par[V]]): Par[V] = es => { val kVal = run(es)(key).get run(es)(choices(kVal)) } def chooser[A, B](p: Par[A])(choices: A => Par[B]): Par[B] = es => { val k = run(es)(p).get run(es)(choices(k)) } /** `chooser` is usually called `flatMap` or `bind`. * and you will find, flatMap is a higher abstract for chooser/choiceMap/choice/choiceN */ def flatMap[A, B](p: Par[A])(choices: A => Par[B]): Par[B] = es => { val k = run(es)(p).get run(es)(choices(k)) } def choiceViaFlatMap[A](p: Par[Boolean])(f: Par[A], t: Par[A]): Par[A] = flatMap(p)(b => if (b) f else t) def choiceNViaFlatMap[A](p: Par[Int])(choices: List[Par[A]]): Par[A] = flatMap(p)(i => choices(i)) def joinViaFlatMap[A](pp: Par[Par[A]]): Par[A] = flatMap(pp)(p => p) def join[A](a: Par[Par[A]]): Par[A] = es => run(es)(run(es)(a).get()) def flatMapViaJoin[A, B](p: Par[A])(f: A => Par[B]): Par[B] = join(map(p)(f)) /* Gives us infix syntax for `Par`. */ implicit def toParOps[A](p: Par[A]): ParOps[A] = new ParOps(p) class ParOps[A](p: Par[A]) {} } object Examples { import Par._ def sum(ints: IndexedSeq[Int]): Int = if (ints.size <= 1) ints.headOption getOrElse 0 else { val (l, r) = ints.splitAt(ints.length / 2) sum(l) + sum(r) } def main(args: Array[String]): Unit = { println(sum(Range(0, 10))) val es = Executors.newFixedThreadPool(2) val parInt: Par[Int] = es => es.submit(new Callable[Int] { override def call() = ((System.currentTimeMillis + 1.0) / System.currentTimeMillis) toInt }) val parString: Par[String] = es => es.submit(new Callable[String] { override def call() = " |<*>| " * 3 }) val parIntMap = Par.map(parInt)((a: Int) => a * 8) val parMap2 = Par.map2[Int, String, (Int, String)](parInt, parString)((_ -> _)) println(run(es)(parInt).get) println(run(es)(parIntMap).get) println(run(es)(parMap2).get) // will lead to deadLock because the number of thread is 1 in thread pool `es` println(run(es)(fork(parMap2)).get) es shutdown val singleThreadEs = Executors.newFixedThreadPool(1) // fork Par[A] will lead to block // val singleThreadEs = Executors.newFixedThreadPool(2) // fork Par[A] will not lead to block // singleThreadEs如果只有一條執行緒,則會發生阻賽, 不能println結果 //如果singleThreadEs只有唯一一條工作執行緒, 那麼控制檯看不到結果, println(run(singleThreadEs)(fork(parMap2)).get) singleThreadEs shutdown //如果上一行程式碼阻賽,shutdown 方法永遠不會被呼叫 } }

上述程式碼的執行結果是:

45
1
8
(1, |<*>|  |<*>|  |<*>| )
(1, |<*>|  |<*>|  |<*>| )
879675643@qq.com  lhever

.---.                                                                         
|   |   .              __.....__   .----.     .----.   __.....__              
|   | .'|          .-''         '.  \    \   /    /.-''         '.            
|   |<  |         /     .-''"'-.  `. '   '. /'   //     .-''"'-.  `. .-,.--.  
|   | | |        /     /________\   \|    |'    //     /________\   \|  .-. | 
|   | | | .'''-. |                  ||    ||    ||                  || |  | | 
|   | | |/.'''. \\    .-------------''.   `'   .'\    .-------------'| |  | | 
|   | |  /    | | \    '-.____...---. \        /  \    '-.____...---.| |  '-  
|   | | |     | |  `.             .'   \      /    `.             .' | |      
'---' | |     | |    `''-...... -'      '----'       `''-...... -'   | |      
      | '.    | '.                                                   |_|      
      '---'   '---'