以例服人:一遍講透CompletableFuture
什麼高深的道理,一篇好的例子都能講透;如果不行,那就兩篇。如果還不行,文末看答案。
CompletableFuture 是 Java 8 引入的,一個很好用的特性,但被 Lambda 、 Stream 等的光環給蓋住了。
總的來說,CompletableFuture把Java 5引入的 Future 的鍋都補上了,具體直接看例子吧。
例子說明
本文所述的例子純屬虛構,如有雷同,純屬巧合(可聯絡作者刪除),版權歸作者所有。
例子主要服務於問題/原理講解,以通熟易懂為主。可能與現實或常識脫鉤,請別太較真。
這是一個虛構的訂單分揀的例子:
-
一個訂單隻包括一個商品,數量不限(>=1)
-
4個倉庫中心(010,020,021,023),包含每個商品在具體哪個貨架多少數量等明細
- 倉庫不總是可用的(譬如錯峰放假、運送區域不支援等)
-
有一個全域性的庫存彙總(無明細,可快速查詢全國某商品的總數)
-
為了提高分庫扣減庫存的有效性
- 先查詢總庫,只有總庫存數量足夠,才開始進行分庫庫存分揀
-
在分揀的同時,訂單可能被撤銷。一旦撤銷,本次分揀結束
-
為使程式碼簡單
-
庫存分揀只是查詢分庫庫存
-
庫存回撤沒有任何實現
-
實現是有狀態的,所以不是執行緒安全的
-

處理流程
彩色文字是後面對應的程式碼呼叫。看程式碼的時候可以結合流程圖加深理解。

狀態變化
大概的介面定義(延遲模擬遠端呼叫):
-
int StockService.query(String product)
: 查總庫庫存 ( 快速 )- 隨機延遲100~200ms
-
int StockService.pick(String repo, String product)
: 查/鎖定分庫庫存(含貨架)- 隨機延遲500~2500ms
-
Availability RepoService.isAvailable(String repo)
: 查分庫是否可用 ( 快速 )- 隨機延遲100ms
-
void EventService.listenOrderCancel(String order)
: 監聽訂單取消事件- 延遲2000ms
-
boolean PackageService.pack(String oid, String pid)
: 分揀訂單- 分成2個子類SingleRepoPackageService和MultiRepoPackage類分別演示單倉庫和多倉庫的實現
-
共用程式碼存放入AbstractRepoPackageService
物件模型
例子實戰 - 單倉庫
文末有完整程式碼
流程總體控制
幾個閥門(CompletableFuture,下簡稱CF)來控制什麼時候終止流程:
/* 監聽訂單取消事件 */ CompletableFuture<Boolean> cancelListener = runAsync(() -> eventService.listenOrderCancel(oid)) .thenApply(reason -> false); /* 分揀結束標記 */ CompletableFuture<Boolean> allocated = new CompletableFuture<>(); /* 總開關(權衡訂單取消和分揀結束) */ CompletableFuture<Boolean> terminator = allocated.applyToEitherAsync(cancelListener, Function.identity());
監聽訂單取消事件
使用CF. runAsync
來 建立 CF例項,並在收到訊號後回撥( thenApply
)宣告分揀失敗
分揀結束標記
直接建立一個新例項,其結束將由後面的分揀流程控制(直接呼叫其 complete(boolean)
)
總開關
使用CF的二選一( applyToEitherAsync
)來建立一個新的總控CF:即任何一個CF執行完畢,這個總控就結束。
分揀
supplyAsync(() -> stockService.query(pid)) .whenComplete((stock, e) -> { if (e != null) { e.printStackTrace(); allocated.complete(false); } else if (stock < q) { logger.log("not enough stock (%d < %d)", stock, q); allocated.complete(false); } else { logger.log("total stock is enough (%d >= %d). " + "allocating from repos...", stock, q); startPick(pid); } });
先檢查總庫存(快速)
使用CF. supplyAsync(Supplier<Integer>)
開啟非同步總庫存查詢。
根據庫存情況
whenComplete(BiConsumer<Integer, Throwable>)
是其執行結果的回撥入口,根據庫存情況:
- 不足,標識分揀以失敗結束(
allocated.complete(false)
)(級聯觸發總開關結束) - 充足,開始分庫分揀(
startPick()
) - 異常處理 ,標記失敗結束(
allocated.complete(false)
)(級聯觸發總開關結束)
分庫分揀
supplyAsync(() -> repoService.isAvailable(repo)) .thenCompose(this::pick) .thenAccept(this::allocateStock) .thenRun(this::isAllocatedFully) // <-- 到這裡返回就是startPickFromOneRepo(),後面多倉庫會呼叫 .whenComplete(this::completeAllocate);
檢查分庫的可用性
supplyAsync(Supplier<Availability>)
建立CF例項,檢查分庫的可用性
分揀
thenCompose(Function<Availability, Stock>)
回撥,根據分庫的可用性:
-
不可用,直接建立一個空的CF<Stock>,標識數量為0
CompletableFuture<Stock> dummy = new CompletableFuture<Stock>(); dummy.complete(new Stock(a.repo, 0)); return dummy;
-
可用,建立新的查詢庫存的CF<Stock> (
supplyAsync(Supplier<Stock>)
)return supplyAsync(() -> stockService.pick(a.repo, pid));
分配庫存
thenAccept(Consumer<Stock>)
回撥,執行實際的庫存分配
檢查是否庫存分配達標
thenRun(Runnable)
回撥,如果達標, allocated.complete(true)
標記分揀以成功結束(級聯觸發總開關結束)
打完收工
whenComplete(BiConsumer<Void, Throwable>)
收尾,處理分配標識還沒結束的case(前面庫存分配達標時,會標識結束)。這種遺漏的情況標記分揀以失敗結束(級聯觸發總開關結束)
返回結果
return terminator.get(5, TimeUnit.SECONDS);
例子實戰 - 多倉庫
final CompletableFuture[] queries = new CompletableFuture[repos.size()]; final Iterator<String> iter = repos.iterator(); for (int i = 0; i < queries.length; i++) { queries[i] = startPickFromOneRepo(iter.next(), pid); } allOf(queries).whenComplete(this::completeAllocate);
基本和單倉庫類似,主要是迴圈讓單倉庫的分揀並行跑。
allOf(CF...)
:必須所有的CF都完成了,才開始回撥 whenComplete(BiConsumer<Void, Throwable>)
。注意之前的流程圖和單倉庫處理邏輯,中間:
- 庫存分配滿額/達標,會提前標識成功分揀結束(其實其他倉庫的分配還在並行進行,實際程式碼需要注意中斷/或防止額外再分配庫存)
總結
在整個例子中,
建立/開啟CF例項
用到的:
CF.supplyAsync(Supplier<T>) CF.runAsync(Runnable) new CF()
遺漏的:
-
handleAsync(Function<T, U>)
關於回撥
用到的:
thenApply(Function<T, U>) whenComplete(BiConsumier<T, Throwable>) thenCompose(Function<T, U>) thenAccept(Consumer<T>) thenRun(Runnable)
遺漏的:
都在了
關於二合一或二選一
用到的:
-
applyToEitherAsync(CF, Function<T, U>)
遺漏的:
acceptEitherAsync(CF, Consumer<T>) runAfterBothAsync(CF, Runnable) runAfterEitherAsync(CF, Runnable) thenAcceptBothAsync(CF, BiConsumer<T, U>) thenCombineAsync(CF, BiFunction<T, U, V)
全部或任一
用到的:
-
allOf(CF...)
遺漏的:
都在了
狀態檢查
用到的:
-
isDone()
遺漏的:
isCancelled() isCompletedExceptionally()
手動標識完成/或異常/取消
用到的:
-
complete(T)
遺漏的:
completedFuture(U) completeExceptionally(Throwable) exceptionally(Function<Throwable, T>) obtrudeException(Throwable) obtrudeValue(T)
等待/阻塞
用到的:
-
get
遺漏的:
getNow join
CF的介面設計是很有規律的,譬如:
- 一個方法xxx,其相應的會有
xxxAsync
以及xxxAsync(.., Executor)
的變體- 建議使用Executor的那個,可以避免
ForkJoinPool.commonPool()
堆積問題(有空會單獨成篇講一下)
- 建議使用Executor的那個,可以避免
- 引數支援了
Function<T, U>
的,必然有其他類似但接受型別為Consumer<T>
和Runnable
的
理解了一種,對接受其他幾個變體應該難度不大。
我們這個例子涵蓋了每個大類的1~N個方法,對於CF的使用大概就應該是這樣了。
呼叫套路
supplyAsync thenXxx thenCombineAsync
注意:不是所有的問題都需要走完整的套路的。
回答開頭的問題:
放出所有原始碼(guava是唯一的外部依賴)。注意,由於這裡有很多隨機數,請多跑幾遍,應該能跑出所有的可能:
- 總的庫存不足,分配失敗
[ 109ms] initialized repos: [023, 020, 010, 021] [11ms] pre-checking stock quickly... [ 135ms] stocks: [total=3, repos={023=0, 020=0, 010=1, 021=2}] [ 141ms] not enough stock (3 < 5) [ 142ms] allocated: false
- 訂單無故被取消
[ 110ms] initialized repos: [023, 020, 010, 021] [14ms] pre-checking stock quickly... [ 149ms] stocks: [total=7, repos={023=1, 020=1, 010=2, 021=3}] [ 158ms] total stock is enough (7 >= 5). allocating from repos... [ 165ms] repo 021 NOT available [ 165ms] repo 021 was allocated 0 [ 194ms] repo 020 is available [ 250ms] repo 010 is available [ 255ms] repo 023 is available [1336ms] repo 020 was allocated 1 [2027ms] cancelled with no reason [2028ms] allocated: false
- 總庫存足,但分庫的分配沒有滿額
[ 126ms] initialized repos: [023, 020, 010, 021] [27ms] pre-checking stock quickly... [ 194ms] stocks: [total=6, repos={023=1, 020=1, 010=2, 021=2}] [ 203ms] total stock is enough (6 >= 5). allocating from repos... [ 259ms] repo 023 is available [ 288ms] repo 020 is available [ 291ms] repo 021 NOT available [ 292ms] repo 021 was allocated 0 [ 294ms] repo 010 is available [1355ms] repo 020 was allocated 1 [1664ms] repo 023 was allocated 1 [1930ms] repo 010 was allocated 2 [1930ms] didn't get enough stock. [1930ms] allocated: false
- 分配成功
[ 104ms] initialized repos: [023, 020, 010, 021] [13ms] pre-checking stock quickly... [ 206ms] stocks: [total=6, repos={023=1, 020=1, 010=2, 021=2}] [ 213ms] total stock is enough (6 >= 5). allocating from repos... [ 237ms] repo 020 is available [ 238ms] repo 023 is available [ 241ms] repo 010 is available [ 284ms] repo 021 is available [ 937ms] repo 021 was allocated 2 [1052ms] repo 020 was allocated 1 [1278ms] repo 023 was allocated 1 [1718ms] repo 010 was allocated 2 [1719ms] 6 >= 5 [1719ms] allocated: true
程式碼版權歸作者,僅非商業使用時可無需作者授權即可使用(非常感謝使用時標註來源)。
希望這篇博文能對你有所幫助,喜歡的話點個贊吧!
PackageService.java
package cf; /** * @author [email protected] * @since 10/9/2018 */ public interface PackageService { boolean pack(String oid, String pid); }
AbstractPackageService.java
package cf; import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static java.util.concurrent.CompletableFuture.*; import static cf.Util.*; /** * Not ThreadSafe and just for demo. * * @author [email protected] * @since 10/9/2018 */ public abstract class AbstractPackageService implements PackageService { protected StockService stockService = new StockService(); protected RepoService repoService = new RepoService(); protected EventService eventService = new EventService(); /* stateful variables. */ protected AtomicInteger queriedQ = new AtomicInteger(0); protected CompletableFuture<Boolean> allocated = new CompletableFuture<>(); @Override public boolean pack(String oid, String pid) { /* set global repos since it's used by single repo + multi repo demos. */ repos.clear(); repos.addAll(getRepos()); logger.log("initialized repos: %s", repos); /* set started time. */ logger.start(); /* a listener to monitor if this order's cancellation event was emitted. */ final CompletableFuture<Boolean> cancelListener = runAsync(() -> eventService.listenOrderCancel(oid)) .thenApply(reason -> false); /* a control to indicate if package was completed. */ final CompletableFuture<Boolean> terminator = allocated.applyToEitherAsync(cancelListener, Function.identity()); logger.log("pre-checking stock quickly..."); supplyAsync(() -> stockService.query(pid)) .whenComplete((stock, e) -> { if (e != null) { e.printStackTrace(); allocated.complete(false); } else if (stock < q) { logger.log("not enough stock (%d < %d)", stock, q); allocated.complete(false); } else { logger.log("total stock is enough (%d >= %d). allocating from repos...", stock, q); startPick(pid); } }); try { // wait until package was completed. return terminator.get(5, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); return false; } } /** repos used to initialize global repos and generate stocks. */ protected abstract Collection<String> getRepos(); /** the entry to kick off pick process. */ protected abstract void startPick(String pid); /** a process to pick up stock from one repo. */ protected CompletableFuture<Void> startPickFromOneRepo(String repo, String pid) { return supplyAsync(() -> repoService.isAvailable(repo)) .thenCompose(this::pick) .thenAccept(this::allocateStock) .thenRun(this::isAllocatedFully) ; } /** pick up stock based on repo's availability. */ protected CompletableFuture<Stock> pick(RepoService.Availability a) { if (!a.available) { CompletableFuture<Stock> dummy = new CompletableFuture<Stock>(); dummy.complete(new Stock(a.repo, 0)); return dummy; } else { return supplyAsync(() -> stockService.pick(a.repo, pid)); } } /** allocate stock. */ protected void allocateStock(Stock stock) { queriedQ.addAndGet(stock.count); logger.log("repo %s was allocated %d", stock.repo, stock.count); } /** check if all stocks are allocated enough. If yes, stop process. */ protected boolean isAllocatedFully() { final int i = queriedQ.get(); if (i >= q) { logger.log("%d >= %d", i, q); allocated.complete(true); } return i >= q; } /** complete allocation process. */ protected void completeAllocate(Void v, Throwable e) { if (e != null) { e.printStackTrace(); }else if (!allocated.isDone()) { allocated.complete(false); logger.log("didn't get enough stock."); } } }
SingleRepoPackageService.java
package cf; import com.google.common.collect.ImmutableSet; import java.util.Collection; import static cf.Util.*; /** * Not ThreadSafe and just for demo. * * @author [email protected] * @since 10/9/2018 */ public class SingleRepoPackageService extends AbstractPackageService { private final String repo = "021"; public static void main(String... args) throws Exception { final boolean result = new SingleRepoPackageService().pack(oid, pid); logger.log("allocated: %s", result); } @Override protected Collection<String> getRepos() { return ImmutableSet.of(repo); } protected void startPick(String pid) { startPickFromOneRepo(repo, pid).whenComplete(this::completeAllocate); } }
MultiRepoPackageService.java
package cf; import com.google.common.collect.ImmutableSet; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.CompletableFuture; import static cf.Util.*; import static java.util.concurrent.CompletableFuture.allOf; /** * Not ThreadSafe and just for demo. * * @author [email protected] * @since 9/9/2018 */ public class MultiRepoPackageService extends AbstractPackageService { public static void main(final String... args) throws Exception { final boolean result = new MultiRepoPackageService().pack(oid, pid); logger.log("allocated: %s", result); } @Override protected Collection<String> getRepos() { return ImmutableSet.of("010", "020", "021", "023"); } @Override protected void startPick(String pid) { final CompletableFuture[] queries = new CompletableFuture[repos.size()]; final Iterator<String> iter = repos.iterator(); for (int i = 0; i < queries.length; i++) { queries[i] = startPickFromOneRepo(iter.next(), pid); } allOf(queries).whenCompleteAsync(this::completeAllocate); } }
Util.java
package cf; import java.nio.ByteBuffer; import java.security.SecureRandom; import java.time.LocalTime; import java.util.HashSet; import java.util.Random; import java.util.Set; /** * @author [email protected] * @since 9/9/2018 */ public interface Util { Logger logger = Logger.getInstance(); int q = 5; Set<String> repos = new HashSet<>(); String oid = "jianshu"; String pid = "Samsung S10"; Random r = new SecureRandom(ByteBuffer.allocate(4).putInt(LocalTime.now().getNano()).array()); static void delay(int base, int random) { try { Thread.sleep(base + r.nextInt(random)); } catch (InterruptedException e) { e.printStackTrace(); } } }
Logger.java
package cf; /** * @author [email protected]. * @since 9/9/2018 */ public class Logger { private static final Logger INSTANCE = new Logger(); private long started; private Logger() { started = System.nanoTime(); } static Logger getInstance() { return INSTANCE; } Logger start() { started = System.nanoTime(); return this; } void log(String s, Object... args) { if (args==null) args = new Object[0]; final String formatS = "[%4dms] " + s + "%n"; //+ "\t<<<%s>>>%n"; final int argLength = args.length + 2; final Object[] args2 = new Object[argLength]; args2[0] = (System.nanoTime()-started)/1_000_000; System.arraycopy(args, 0, args2, 1, args.length); args2[argLength-1] = Thread.currentThread().getName(); System.out.format(formatS, args2); } }
EventService.java
package cf; import static cf.Util.*; /** * @author [email protected] * @since 9/9/2018 */ public class EventService { public void listenOrderCancel(String order) { delay(2000, 300); logger.log("cancelled with no reason"); } }
RepoService.java
package cf; import static cf.Util.*; /** * @author [email protected] * @since 9/9/2018 */ public class RepoService { public Availability isAvailable(String repo) { delay(0, 100); final Availability availability = new Availability(repo, r.nextInt(5) != 0); logger.log("repo %s %s available", repo, availability.available ? "is" : "NOT"); return availability; } public static class Availability { String repo; boolean available; public Availability(String repo, boolean available) { this.repo = repo; this.available = available; } } }
Stock.java
package cf; /** * @author [email protected] * @since 9/9/2018 */ class Stock { String repo; int count; public Stock(String repo, int count) { this.repo = repo; this.count = count; } @Override public String toString() { final StringBuilder sb = new StringBuilder("Stock{"); sb.append("repo='").append(repo).append('\''); sb.append(", count=").append(count); sb.append('}'); return sb.toString(); } }
StockService.java
package cf; import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import static cf.Util.*; /** * @author [email protected] * @since 9/9/2018 */ public class StockService { private Map<String, Integer> stocks = new HashMap<>(); public int query(String prd) { delay(100, 100); int q2 = (q-q/4-1) + r.nextInt(q); generateStock(q2); return q2; } public Stock pick(String repo, String prd) { final Stock stock = new Stock(repo, stocks.get(repo)); delay(500, 2000); return stock; } private void generateStock(int q) { final Iterator<String> iter = repos.iterator(); if (repos.size() == 1) { stocks = ImmutableMap.of(iter.next(), q); } else { stocks = ImmutableMap.of( iter.next(), q / 5, iter.next(), q / 4, iter.next(), q / 3, iter.next(), (q - q / 5 - q / 4 - q / 3) ); } logger.log("stocks: [total=%d, repos=%s]", q, stocks); } }
希望這篇博文能對你有所幫助,喜歡的話點個贊吧!