1. 程式人生 > >java1.8實戰學習(一)——總結:流處理、行為引數化、並行與共享

java1.8實戰學習(一)——總結:流處理、行為引數化、並行與共享

筆者這段時間在學習java8的新特性,發現有好多新的特點,特寫此部落格用於梳理記錄學習,不用每次都抱著pdf《java8實戰》去看,也供大家參考

下一篇:java1.8實戰學習(二)

知識點概括

總結了Java的主要變化(Lambda表示式、方法引用、流和預設方法),併為學習後面的內容做好準備。

  •  流處理

 第一個程式設計概念是流處理。介紹一下,流是一系列資料項,一次只生成一項。程式可以從輸入流中一個一個讀取資料項,然後以同樣的方式將資料項寫入輸出流。一個程式的輸出流很可能是另一個程式的輸入流。

一個實際的例子是在Unix或Linux中,很多程式都從標準輸入(Unix和C中的stdin,Java中的System.in)讀取資料,然後把結果寫入標準輸出(Unix和C中的stdout,Java中的System.out)。首先我們來看一點點背景:Unix的cat命令會把兩個檔案連線起來建立一個流,tr會轉換流中的字元,sort會對流中的行進行排序,而tail -3則給出流的最後三行。Unix命令列允許這些程式通過管道(|)連線在一起,比如

cat file1 file2 | tr "[A-Z]" "[a-z]" | sort | tail -3 

會(假設file1和file2中每行都只有一個詞)先把字母轉換成小寫字母,然後打印出按照詞典排序出現在最後的三個單詞。我們說sort把一個行流①作為輸入,產生了另一個行流(進行排序)作為輸出,如圖1-2所示。請注意在Unix中,命令(cat、tr、sort和tail)是同時執行的,這樣sort就可以在cat或tr完成前先處理頭幾行。就像汽車組裝流水線一樣,汽車排隊進入加工站,每個加工站會接收、修改汽車,然後將之傳遞給下一站做進一步的處理。儘管流水線實際上是一個序列,但不同加工站的執行一般是並行的。

 

基於這一思想,Java 8在java.util.stream中添加了一個Stream API;Stream<T>就是一系列T型別的專案。

推動這種做法的關鍵在於,現在你可以在一個更高的抽象層次上寫Java 8程式了:思路變成了把這樣的流變成那樣的流(就像寫資料庫查詢語句時的那種思路),而不是一次只處理一個專案。另一個好處是,Java 8可以透明地把輸入的不相關部分拿到幾個CPU核心上去分別執行你的Stream操作流水線——這是幾乎免費的並行,用不著去費勁搞Thread了。

  • 用行為引數化把程式碼傳遞給方法

Java 8中增加的另一個程式設計概念是通過API來傳遞程式碼的能力。

在Unix的例子裡,你可能想告訴sort命令使用自定義排序。雖然sort命令支援通過命令列引數來執行各種預定義型別的排序,比如倒序,但這畢竟是有限的。

比方說,你有一堆發票程式碼,格式類似於2013UK0001、2014US0002……前四位數代表年份,接下來兩個字母代表國家,最後四位是客戶的程式碼。你可能想按照年份、客戶程式碼,甚至國家來對發票進行排序。你真正想要的是,能夠給sort命令一個引數讓使用者定義順序:給sort命令傳遞一段獨立程式碼。

那麼,直接套在Java上,你是要讓sort方法利用自定義的順序進行比較。你可以寫一個compareUsingCustomerId來比較兩張發票的程式碼,但是在Java 8之前,你沒法把這個方法傳給另一個方法。Java 8增加了把方法(你的程式碼)作為引數傳遞給另一個方法的能力。圖1-3是基於圖1-2畫出的,它描繪了這種思路。我們把這一概念稱為行為引數化。Stream API就是構建在通過傳遞程式碼使操作行為實現引數化的思想上的,當把compareUsingCustomerId傳進去,你就把sort的行為引數化了。
 

 

  • 並行與共享的可變資料

Java 8的流實現並行比Java現有的執行緒API更容易,因此,儘管可以使用synchronized來打破“不能有共享的可變資料”這一規則,但這相當於是在和整個體系作對,因為它使所有圍繞這一規則做出的優化都失去意義了。在多個處理器核心之間使用synchronized,其代價往往比你預期的要大得多,因為同步迫使程式碼按照順序執行,而這與並行處理的宗旨相悖。這兩個要點(沒有共享的可變資料,將方法和函式即程式碼傳遞給其他方法的能力)是我們平常所說的函數語言程式設計正規化的基石。

  •  一個傳遞程式碼的例子

假設你有一個Apple類,它有一個getColor方法,還有一個變數inventory儲存著一個Apples的列表。你可能想要選出所有的綠蘋果,並返回一個列表。通常我們用篩選(filter)一詞來表達這個概念。在Java 8之前,你可能會寫這樣一個方法filterGreenApples:

public static List<Apple> filterGreenApples(List<Apple> inventory){ 
 List<Apple> result = new ArrayList<>(); 
 for (Apple apple: inventory){ 
 if ("green".equals(apple.getColor())) {
 result.add(apple); 
 } 
 } 
 return result; 
}

但是接下來,有人可能想要選出重的蘋果,比如超過150克,於是你心情沉重地寫了下面這個方法,甚至用了複製貼上:
 

public static List<Apple> filterHeavyApples(List<Apple> inventory){ 
 List<Apple> result = new ArrayList<>(); 
 for (Apple apple: inventory){ 
 if (apple.getWeight() > 150) {  //僅僅此句不同
 result.add(apple); 
 } 
 } 
 return result; 
} 

我們都知道軟體工程中複製貼上的危險——給一個做了更新和修正,卻忘了另一個。這兩個方法只有一行不同:if裡面高亮的那行條件。如果這兩個高亮的方法之間的差異僅僅是接受的重量範圍不同,那麼你只要把接受的重量上下限作為引數傳遞給filter就行了,比如指定(150, 1000)來選出重的蘋果(超過150克),或者指定(0, 80)來選出輕的蘋果(低於80克)。但是,我們前面提過了,Java 8會把條件程式碼作為引數傳遞進去,這樣可以避免filter方法出現重複的程式碼。現在你可以寫:

 

public static boolean isGreenApple(Apple apple) { 
 return "green".equals(apple.getColor());
} 
public static boolean isHeavyApple(Apple apple) { 
 return apple.getWeight() > 150;
} 
public interface Predicate<T>{ 
boolean test(T t); 
} 
static List<Apple> filterApples(List<Apple> inventory, 
 Predicate<Apple> p) {
 List<Apple> result = new ArrayList<>(); 
 for (Apple apple: inventory){ 
 if (p.test(apple)) { 
 result.add(apple); 
 } 
 } 
 return result; 
} 

要用它的話,你可以寫:

filterApples(inventory, Apple::isGreenApple);
// 或者
filterApples(inventory, Apple::isHeavyApple);  
  • 從傳遞方法到 Lambda

把方法作為值來傳遞顯然很有用,但要是為類似於isHeavyApple和isGreenApple這種可能只用一兩次的短方法寫一堆定義有點兒煩人。不過Java 8也解決了這個問題,它引入了一套新記法(匿名函式或Lambda),讓你可以寫

filterApples(inventory, (Apple a) -> "green".equals(a.getColor()) ); 
//或者
filterApples(inventory, (Apple a) -> a.getWeight() > 150 ); 
//甚至
filterApples(inventory, (Apple a) -> a.getWeight() < 80 || 
 "brown".equals(a.getColor()) ); 

所以,你甚至都不需要為只用一次的方法寫定義;程式碼更乾淨、更清晰,因為你用不著去找自己到底傳遞了什麼程式碼。但要是Lambda的長度多於幾行(它的行為也不是一目瞭然)的話,那你還是應該用方法引用來指向一個有描述性名稱的方法,而不是使用匿名的Lambda。你應該以程式碼的清晰度為準繩。

  • 關於流的一個例子

幾乎每個Java應用都會製造和處理集合。但集合用起來並不總是那麼理想。比方說,你需要從一個列表中篩選金額較高的交易,然後按貨幣分組。你需要寫一大堆套路化的程式碼來實現這個資料處理命令,如下所示:

Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>(); 
for (Transaction transaction : transactions) { 
 if(transaction.getPrice() > 1000){ 
     Currency currency = transaction.getCurrency(); 
     List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency); 
     if (transactionsForCurrency == null) { 
         transactionsForCurrency = new ArrayList<>(); 
         transactionsByCurrencies.put(currency, 
         transactionsForCurrency); 
     } 
     transactionsForCurrency.add(transaction); 
 } 
} 

有了Stream API,你現在可以這樣解決這個問題了:

import static java.util.stream.Collectors.toList; 

Map<Currency, List<Transaction>> transactionsByCurrencies = 
 transactions.stream() 
 .filter((Transaction t) -> t.getPrice() > 1000) 
 .collect(groupingBy(Transaction::getCurrency)); 

現在值得注意的是,和Collection API相比,Stream API處理資料的方式非常不同。用集合的話,你得自己去做迭代的過程。你得用for-each迴圈一個個去迭代元素,然後再處理元素。我們把這種資料迭代的方法稱為外部迭代。相反,有了Stream API,你根本用不著操心迴圈的事情。資料處理完全是在庫內部進行的。我們把這種思想叫作內部迭代

使用集合的另一個頭疼的地方是,想想看,要是你的交易量非常龐大,你要怎麼處理這個巨大的列表呢?單個CPU根本搞不定這麼大量的資料,但你很可能已經有了一臺多核電腦。理想的情況下,你可能想讓這些CPU核心共同分擔處理工作,以縮短處理時間。理論上來說,要是你有八個核,那並行起來,處理資料的速度應該是單核的八倍。

多執行緒並非易事

問題在於,通過多執行緒程式碼來利用並行(使用先前Java版本中的Thread API)並非易事。你得換一種思路:執行緒可能會同時訪問並更新共享變數。因此,如果沒有協調好,資料可能會被意外改變。

Java 8也用Stream API(java.util.stream)解決了這兩個問題:集合處理時的套路和晦澀,以及難以利用多核。這樣設計的第一個原因是,有許多反覆出現的資料處理模式,類似於前一節所說的filterApples或SQL等資料庫查詢語言裡熟悉的操作,如果在庫中有這些就會很方便:

根據標準篩選資料(比如較重的蘋果),提取資料(例如抽取列表中每個蘋果的重量欄位),或給資料分組(例如,將一個數字列表分組,奇數和偶數分別列表)等。

第二個原因是,這類操作常常可以並行化。例如,如圖1-6所示,在兩個CPU上篩選列表,可以讓一個CPU處理列表的前一半,第二個CPU處理後一半,這稱為分支步驟(1)。CPU隨後對各自的半個列表做篩選(2)。最後(3),一個CPU會把兩個結果合併起來

到這裡,我們只是說新的Stream API和Java現有的集合API的行為差不多:它們都能夠訪問資料專案的序列。不過,現在最好記得,Collection主要是為了儲存和訪問資料,而Stream則主要用於描述對資料的計算。這裡的關鍵點在於,Stream允許並提倡並行處理一個Stream中的元素。雖然可能乍看上去有點兒怪,但篩選一個Collection(將上一節的filterApples應用在一個List上)的最快方法常常是將其轉換為Stream,進行並行處理,然後再轉換回List,下面舉的序列和並行的例子都是如此。我們這裡還只是說“幾乎免費的並行”,讓你稍微體驗一下,如何利用Stream和Lambda表示式順序或並行地從一個列表裡篩選比較重的蘋果。

//順序處理
import static java.util.stream.Collectors.toList; 
List<Apple> heavyApples = 
 inventory.stream().filter((Apple a) -> a.getWeight() > 150) .collect(toList()); 
//並行處理:
import static java.util.stream.Collectors.toList; 
List<Apple> heavyApples = 
 inventory.parallelStream().filter((Apple a) -> a.getWeight() > 150) .collect(toList()); 

Java中的並行與無共享可變狀態

大家都說Java裡面並行很難,而且和synchronized相關的玩意兒都容易出問題。那Java 8裡面有什麼“靈丹妙藥”呢?事實上有兩個。首先,庫會負責分塊,即把大的流分成幾個小的流,以便並行處理。其次,流提供的這個幾乎免費的並行,只有在傳遞給filter之類的庫方法的方法不會互動(比方說有可變的共享物件)時才能工作。但是其實這個限制對於程式設計師來說挺自然的,舉個例子,我們的Apple::isGreenApple就是這樣。確實,雖然函數語言程式設計中的函式的主要意思是“把函式作為一等值”,不過它也常常隱含著第二層意思,即“執行時在元素之間無互動”。