泛函編程(13)-無窮數據流-Infinite Stream
上節我們提到Stream和List的主要分別是在於Stream的“延後計算“(lazy evaluation)特性。我們還討論過在處理大規模排列數據集時,Stream可以一個一個把數據元素搬進內存並且可以逐個元素地進行處理操作。這讓我不禁聯想到我們常用的數據搜索讀取方式了:大量的數據存放在數據庫裏,就好像無窮的數據源頭。我們把數據讀取方式(那些數據庫讀寫API函數)嵌入Stream的操作函數內,把數據搜索條件傳入Stream構造器(constructor)中形成一個對數據搜索操作的描述。這個產生的Stream只有在我們調用符合搜索條件的數據庫記錄時才會逐個從數據庫中讀取出來。這可是一個非常熟悉的場景,但我們常常會思考它的原理。
很多時我們都需要無窮數據流(infinite stream),以直接或一些算法方式有規則地重復產生數據。無窮數據流被定義為“反遞歸”(corecursive)的:遞歸的特性是從復雜到簡單並最後終止,而無窮數據流的特性卻是從簡單到復雜並永遠不會終結。
我們先從簡單的無窮數據流開始:數字產生器:
1 def ones: Stream[Int] = cons(1,ones) //> ones: => ch5.genstream.Stream[Int] 2 ones.take(5).toList //> res0: List[Int] = List(1, 1, 1, 1, 1)
ones函數可以產生一個無窮的數據流。每個元素都是常數1。從這個簡單的例子我們可以稍微領略反遞歸的味道:cons(1,ones),通過重復不斷運算cons來產生無窮數據。
我們可以把一些算法嵌入無窮數據流產生過程:
1 def constant[A](a: A): Stream[A] = cons(a, constant(a)) 2 //> constant: [A](a: A)ch5.genstream.Stream[A] 3 constant(5).take(5).toList //> res1: List[Int] = List(5, 5, 5, 5, 5) 4 5 def from(n: Int): Stream[Int] = cons(n, from(n+1))//> from: (n: Int)ch5.genstream.Stream[Int] 6 from(5).take(5).toList //> res2: List[Int] = List(5, 6, 7, 8, 9) 7 8 def fibs: Stream[Int] = { 9 def go (prev: Int, cur: Int): Stream[Int] = { 10 cons(prev,go(cur,prev + cur)) 11 } 12 go(0,1) 13 } //> fibs: => ch5.genstream.Stream[Int] 14 fibs.take(5).toList //> res3: List[Int] = List(0, 1, 1, 2, 3)
從以上這些例子可以看出:我們不斷重復的在cons。而cons的參數則是算法的實現結果。
以下的unfold是一個最通用的Stream構建函數(stream builder),我們需要做些重點介紹:
1 def unfold[A,S](z: S)(f: S => Option[(A, S)]): Stream[A] = { 2 f(z) match { 3 case None => empty 4 case Some((a,s)) => cons(a, unfold(s)(f)) 5 } 6 } //> unfold: [A, S](z: S)(f: S => Option[(A, S)])ch5.genstream.Stream[A]
unfold的工作原理模仿了一種狀態流轉過程:z是一個起始狀態,代表的是一個類型的值。然後用戶(caller)再提供一個操作函數f。f的款式是:S => Option[(A,S)],意思是接受一個狀態,然後把它轉換成一對新的A值和新的狀態S,再把它們放入一個Option。如果Option是None的話,這給了用戶一個機會去終止運算,讓unfold停止遞歸。從unfold的源代碼可以看到f(z) match {} 的兩種情況。需要註意的是函數f是針對z這個類型S來操作的,A類型是Stream[A]的元素類型。f的重點作用在於把S轉換成新的S。我們用一些例子來說明:
1 def constantByUnfold[A](a: A): Stream[A] = unfold(a)(_ => Some((a,a))) 2 //> constantByUnfold: [A](a: A)ch5.genstream.Stream[A] 3 constantByUnfold(2).take(5).toList //> res4: List[Int] = List(2, 2, 2, 2, 2)
constantByUnfold產生一個無窮的常數:a同時代表了元素類型和狀態。_ => Some((a,a))意思是無論輸入任何狀態,元素值和狀態都不轉變,所以unfold會產生同一個數字。另外f的結果永遠不可能是None,所以這是一個無窮數據流(infinite stream)。
再來一個例子:
1 def fromByUnfold(s: Int): Stream[Int] = unfold(s)(s => Some(s,s+1)) 2 //> fromByUnfold: (s: Int)ch5.genstream.Stream[Int] 3 fromByUnfold(5).take(5).toList //> res5: List[Int] = List(5, 6, 7, 8, 9)
fromByUnfold產生一個從s開始的無窮整數序列:s 同時代表了元素類型和狀態。_ => Some((s,s+1))表示新A值是s,新狀態是s+1,所以新s = s + 1。狀態轉變原理可以從改變 s+1 到 s+2 運算後產生的結果得以了解:
1 def fromByUnfold_2(s: Int): Stream[Int] = unfold(s)(s => Some(s,s+2)) 2 //> fromByUnfold_2: (s: Int)ch5.genstream.Stream[Int] 3 fromByUnfold_2(5).take(5).toList //> res6: List[Int] = List(5, 7, 9, 11, 13)
再試一個不同類型的狀態例子:
1 def fibsByUnfold: Stream[Int] = unfold((0,1)){ case (a1,a2) => Some((a1, (a2, a1+a2))) } 2 //> fibsByUnfold: => ch5.genstream.Stream[Int] 3 fibsByUnfold.take(10).toList //> res8: List[Int] = List(0, 1, 1, 2, 3, 5, 8, 13, 21, 34)
在以上的例子裏:S類型為tuple,起始值(0,1),元素類型A是Int。函數f: Int => Option[(Int, Int)]。f函數返回新A=a1, 新狀態S (a2, a1+a2)。由於狀態是個tuple類型,(a1,a2)是個模式匹配操作,所以必須加上case。S=(0,1) >>> (A,S)=(0,(1,0+1)) >>>(1,(1,1+1))>>>(1,(2,2+1))>>>(2,(3,2+3))>>>(3,(5,3+5))>>>(5,(8,5+8))>>>(8,(13,8+13))從以上推斷我們可以得出A>>>0,1,1,2,3,5,8,13,而狀態S>>>(0,1),(1,1),(1,2),(2,3),(3,5),(5,8)...不斷變化。?
在來個更體現狀態轉變的例子:用unfold實現的map函數
1 def mapByUnfoldInfinite[B](f: A => B): Stream[B] = { 2 unfold(uncons) { 3 case Some((h,t)) => Some((f(h),Some((t.head, t.tail)))) 4 case _ => None 5 } 6 } 7 (fromByUnfold(1).mapByUnfoldInfinite {_ + 10}).take(5).toList 8 //> res9: List[Int] = List(11, 12, 13, 14, 15)
S類型即uncons類型>>>Option[(A, Stream[A])], uncons的新狀態是 Some((t.head, t.tail))。因為我們采用了數據結構嵌入式的設計,所以必須用uncons來代表Stream,它的下一個狀態就是Some((t.head, t.tail))。如果使用子類方式Cons(h,t),那麽下一個狀態就可以直接用t來表示,簡潔多了。
再看看還有什麽函數可以用unfold來實現吧:
1 def takeByUnfold(n: Int): Stream[A] = { 2 unfold((uncons,n)) { 3 case (Some((h,t)),k) if (k > 0) => Some(h, (Some((t.head, t.tail)), k-1)) 4 case _ => None 5 } 6 } 7 def takeWhileByUnfold(f: A => Boolean): Stream[A] = { 8 unfold(uncons) { 9 case Some((h,t)) if (f(h)) => Some(h, Some((t.head, t.tail))) 10 case _ => None 11 } 12 } 13 def filterByUnfold(f: A => Boolean): Stream[A] = { 14 unfold(uncons) { 15 case Some((h,t)) if (f(h)) => Some(h, Some((t.head, t.tail))) 16 case _ => None 17 } 18 } 19 def zipWithByUnfold[B,C](b: Stream[B])(f: (A,B) => C): Stream[C] = { 20 unfold((uncons,b.uncons)) { 21 case (Some((ha,ta)),Some((hb,tb))) => Some(f(ha,hb),(Some((ta.head,ta.tail)),Some((tb.head,tb.tail)))) 22 case _ => None 23 } 24 } 25 def zip[B](b: Stream[B]): Stream[(A,B)] = zipWithByUnfold(b){( _ , _)} 26 (fromByUnfold(1).mapByUnfoldInfinite {_ + 10}).take(5).toList 27 //> res9: List[Int] = List(11, 12, 13, 14, 15) 28 (fromByUnfold(1).mapByUnfoldInfinite {_ + 10}).takeByUnfold(5).toList 29 //> res10: List[Int] = List(11, 12, 13, 14, 15) 30 (fromByUnfold(1).mapByUnfoldInfinite {_ + 10}).takeWhileByUnfold(_ < 15).toList 31 //> res11: List[Int] = List(11, 12, 13, 14) 32 (fromByUnfold(1).mapByUnfoldInfinite {_ + 10}).filterByUnfold(_ < 15).toList 33 //> res12: List[Int] = List(11, 12, 13, 14) 34 (fromByUnfold(5) zip fromByUnfold(1).mapByUnfoldInfinite {_ + 10}).take(5).toList 35 //> res13: List[(Int, Int)] = List((5,11), (6,12), (7,13), (8,14), (9,15))
風格基本上是一致的。
寫完這些例子才有時間靜下來想了一下:費了這麽大勁搞這個無窮數據流到底能幹什麽呢?作為數據庫搜索的數據源嗎,這個可以用普通的Stream來實現。由於無窮數據流是根據一些算法有規則的不停頓產生數據,那麽用來搭建測試數據源或者什麽數學統計模式環境道是是可以的。想到不斷產生數據,那麽用來畫些動態的東西也行吧,那在遊戲軟件中使用也有可能了。
泛函編程(13)-無窮數據流-Infinite Stream