上兩節我們建了一個並行運算元件庫,實現了一些基本的並行運算功能。到現在這個階段,編寫並行運算函式已經可以和數學代數解題相近了:我們瞭解了問題需求,然後從型別匹配入手逐步產生題解。下面我們再多做幾個練習吧。

在上節我們介紹了asyncF,它的型別款式是這樣的:asyncF(f: A => B): A => Par[B],從型別款式(type signature)分析,asyncF函式的功能是把一個普通的函式 A => B轉成A => Par[B],Par[B]是一個並行運算。也就是說asyncF可以把一個輸入引數A的函式變成一個同樣輸入引數A的並行運算。asyncF函式可以把List[A],一串A值,按照函式A => B變成List[Par[A]],即一串並行運算。

例:函式f: (a: A) => a + 10:List(1,2,3).map(asyncF(f))=List(Par(1+10),Par(2+10),Par(3+10)),這些Par是並行運算的。但它們的運算結果需要另一個函式sequence來讀取。我們從以上分析可以得出sequence的型別款式:

 def sequence[A](lp: List[Par[A]]): Par[List[A]]

用sequence把List[Par[A]]轉成Par[List[A]]後我們就可以用Par.map對List[A]進行操作了。List有map,我們可以再用map對A進行操作。在上一節我們做了個練習:

 def parMap[A,B](l: List[A])(f: A => B): Par[List[B]]

parMap按List[A]產生了一串並行運算的函式f。我們可以從型別匹配著手一步一步推導:

1、lp: List[Par[B]] = l.map(asyncF(f))

2、pl: Par[List[B]] = sequence(lp) >>> parMap

再做個新的習題:用並行運算方式Filter List:

 def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]]

我們還是從型別匹配著手一步步推導:

1、asyncF( a => if(f(a)) List(a) else List() )  >>> Par[List[A]]

2、lpl: List[Par[List[A]]] = as.map( asyncF( a => if(f(a)) List(a) else List()))

3、pll: Par[List[List[A]]] = sequence(lpl)

4、map(pll){ a => a.flatten } >>> Par[List{A]]

   def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]] = {
val pars: List[Par[List[A]]] = as.map(asyncF( (a: A) => if (f(a)) List(a) else List() ))
map(sequence(pars)){ a => a.flatten }
} //> parFilter: [A](as: List[A])(f: A => Boolean)ch71.Par.Par[List[A]]

測試結果:

 parFilter(List(10,29,13,3,6,48)){_ > 10}(es).get//> pool-1-thread-1
//| pool-1-thread-2
//| pool-1-thread-3
//| pool-1-thread-4
//| pool-1-thread-5
//| pool-1-thread-6
//| pool-1-thread-7
//| pool-1-thread-8
//| pool-1-thread-9
//| pool-1-thread-10
//| pool-1-thread-11
//| pool-1-thread-12
//| pool-1-thread-14
//| pool-1-thread-16
//| pool-1-thread-13
//| pool-1-thread-15
//| pool-1-thread-17
//| res0: List[Int] = List(29, 13, 48)

再做一個計算字數的練習:用並行運算方式來計算List裡的文字數。我們儘量用共性的方法來通用化解答。如果文字是以List裝載的活,型別就是:List[String],舉個例項:List("the quick fox","is running","so fast")。我們可以分兩步解決:

1、"the quick fox".split(' ').size >>> 把字串分解成文字並計算數量

2、List(A,B,C) >>> A.size + B.size + C.size >>> 把List裡的文字數積合。

這兩步可以分兩個函式來實現:

1. f: A => B >>> 我們需要把這個函式轉成並行運算:List[Par[B]]

2. g: List[B] => B

   def generalWordCount[A,B](as: List[A])(f: A => B)(g: List[B] => B): Par[B] = {
val lp: List[Par[B]] = as.map(asyncF(f))
val pl: Par[List[B]] = sequence(lp)
map(pl)(g)
} //> generalWordCount: [A, B](as: List[A])(f: A => B)(g: List[B] => B)ch71.Par.P
//| ar[B]
def wordCount(as: List[String]): Par[Int] = {
generalWordCount(as)(_.split(' ').size)(_.sum)
} //> wordCount: (as: List[String])ch71.Par.Par[Int]
val lw = List("the quick silver fox", "is running","the one legged fog", "is hopping")
//> lw : List[String] = List(the quick silver fox, is running, the one legged
//| fog, is hopping)
wordCount(lw)(es).get //> pool-1-thread-1
//| pool-1-thread-3
//| pool-1-thread-2
//| pool-1-thread-15
//| pool-1-thread-16
//| pool-1-thread-7
//| pool-1-thread-10
//| pool-1-thread-14
//| pool-1-thread-6
//| pool-1-thread-13
//| pool-1-thread-9
//| res7: Int = 12

相信大家對泛函程式設計的這種數學解題模式已經有了一定的瞭解。

在前面我們曾經提過現在的fork實現方式如果使用固定數量執行緒池的話有可能造成鎖死:

   val es = Executors.newFixedThreadPool(1)
val a = fork(async(40+2))
run(es)(a).get

我們再回顧一下fork的實現:

     def fork[A](pa: => Par[A]): Par[A] = {
es => {
es.submit(new Callable[A] {
def call: A = run(es)(pa).get
})
}
}

可以看出我們提交的callable內部是一個run par,這個run會再提交一個callable然後鎖定get。外面的callable必須等待內部callable的get鎖定完成。所以這種fork實現是需要兩個執行緒的。如果執行緒池無法再為內部callable提供執行緒的話,那麼外面的callable就會處於永遠等待中形成死鎖。上面的parMap函式會按照List的長度分解出同等數量的並行運算,執行時會造成死鎖嗎?如果執行緒池不是固定數量執行緒的話,答案就是否定的:如果並行運算數量大於執行緒數,那麼運算會分批進行:後面的運算可以等待前面的運算完成後釋放出執行緒後繼續執行,這裡重點是前面的運算始終是可以完成的,所以不會造成死鎖。

我們再看看現在所有的元件函式是否足夠應付所有問題,還需不需要增加一些基本元件,這也是開發一個函式庫必須走的過程;這就是一個不斷更新的過程。

現在有個新問題:如果一個並行運算的執行依賴另一個並行運算的結果,應該怎樣解決?先看看問題的型別款式:

   def choice[A](pa: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A]

我們可能馬上想到用map: map(pa){b => if(b) ifTrue else ifFalse}, 不過這樣做的結果型別是:Par[Par[A]], 是代表我們需要新的元件函式來解決這個問題嗎?我們先試著解這個題:

   def choice[A](pa: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] = {
es => if(run(es)(pa).get) run(es)(ifTrue) else run(es)(ifFalse)
}

我們可以看到現在choice是個最基本元件了。為了解決一個問題就創造一個新的元件不是泛函程式設計的風格。應該是用一些更基本的元件組合成一個描述這個問題的函式,那才是我們要採用的風格。我們應該試著用一個函式能把Par[Par[A]]變成Par[A],可能就可以用map了:

ppa: Par[Par[A]], 如果 run(es)(ppa).get 得到 pa: Par[A], 再run(es)(pa) >>> Future[A]。 Par[A] = es => Future[A],不就解決問題了嘛:

   def join[A](ppa: Par[Par[A]]): Par[A] = {
es => {
run(es)(run(es)(ppa).get())
}
}

現在可以用map來實現choice了吧。但是,map是針對元素A來操作的,ifTrue和ifFalse都是Par[A],還無法使用map。那就先放放吧。

既然我們能在兩個並行運算中選擇,那麼能在N個並行運算中選擇不是能更抽象嗎?

   def choiceN[A](pb: Par[Int])(choices: List[Par[A]]): Par[A]

run(es)(pb).get 得出指數(index), choices(index)就是選擇的運算了:

   def choiceN[A](pb: Par[Int])(choices: List[Par[A]]): Par[A] = {
es => {
run(es)(choices(run(es)(pb).get))
}
}

從choiceN中我們可以發現一個共性模式:是一個選擇函式:Int => Par[A]。再抽象一步我們把選擇函式變成:A => Par[B]。這個函式就像之前接觸過的flatMap函式的傳入引數函式f一樣的。我們先看看flatMap的型別款式:

   def flatMap[A,B](pa: Par[A])(f: A => Par[B]): Par[B]

我們只要flatMap pb 傳入 A => Par[B]就可以實現choiceN了:

   def flatMap[A,B](pa: Par[A])(f: A => Par[B]): Par[B] = {
es => {
run(es)(f(run(es)(pa).get))
}
}

有了flatMap,我們可以用它來實現choice,choiceN了:

   def choiceByFlatMap[A](pb: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] ={
flatMap(pb){a => if (a) ifTrue else ifFalse }
}
def choiceNByFlatMap[A](pb: Par[Int])(choices: List[Par[A]]): Par[A] = {
flatMap(pb){choices(_)}
}

在前面我們無法用map來實現choice,因為型別不匹配。加了一個join函式,又因為map元素型別不匹配,又不行。現在看來flatMap恰恰是我們需要解決choice的元件,而且flatMap能更抽象一層,連choiceN都一併解決了。值得注意的是我們在以上解決問題的過程中一再提及型別匹配,這恰恰體現了泛函程式設計就是函式解題的過程。

那麼flatMap,join,map之間有沒有什麼數學關係呢?

   def joinByFlatMap[A](ppa: Par[Par[A]]): Par[A] = {
flatMap(ppa){(x: Par[A]) => x}
}
def flatMapByJoin[A,B](pa: Par[A])(f: A => Par[B]): Par[B] = {
join(map(pa)(f))
}
def mapByFlatMap[A,B](pa: Par[A])(f: A => B): Par[B] = {
flatMap(pa) { a => unit(f(a)) }
}

它們之間的確可以用數學公式來表達。