在項目開發中,后端服務對外提供API接口一般都會關注響應時長。但是某些情況下,由于業務規劃邏輯的原因,我們的接口可能會是一個聚合信息處理類的處理邏輯,比如我們從多個不同的地方獲取數據,然后匯總處理為最終的結果再返回給調用方,這種情況下,往往會導致我們的接口響應特別的慢。
而如果我們想要動手進行優化的時候呢,就會涉及到串行處理還是并行處理的問題。在JAVA中并行處理的能力支持已經相對完善,通過對CompletableFuture的合理利用,可以讓我們面對這種聚合類處理的場景會更加的得心應手。
好啦,話不多說,接下來就讓我們一起來品嘗下JAVA中組合式并行處理這道饕餮大餐吧。
前菜:先看個實際場景
在開始享用這頓大餐前,我們先來個前菜開開胃。
例如現在有這么個需求:
需求描述: 實現一個全網比價服務,比如可以從某寶、某東、某夕夕去獲取某個商品的價格、優惠金額,并計算出實際付款金額,最終返回價格最優的平臺與價格信息。
這里假定每個平臺獲取原價格與優惠券的接口已經實現、且都是需要調用HTTP接口查詢的耗時操作,Mock接口每個耗時1s左右。
根據最初的需求理解,我們可以很自然地寫出對應的實現代碼:
public PriceResult getCheapestPlatAndPrice(String product) { // 獲取某寶的價格以及優惠,并計算最終實付價格 PriceResult mouBaoPrice = computeRealPrice(HttpRequestMock.getMouBaoPrice(product), HttpRequestMock.getMouBaoDiscounts(product)); // 獲取某東的價格以及優惠,并計算最終實付價格 PriceResult mouDongPrice = computeRealPrice(HttpRequestMock.getMouDongPrice(product), HttpRequestMock.getMouDongDiscounts(product)); // 獲取某夕夕的價格以及優惠,并計算最終實付價格 PriceResult mouXiXiPrice = computeRealPrice(HttpRequestMock.getMouXiXiPrice(product), HttpRequestMock.getMouXiXiDiscounts(product)); // 計算并選出實際價格最低的平臺 return Stream.of(mouBaoPrice, mouDongPrice, mouXiXiPrice). min(Comparator.comparingInt(PriceResult::getRealPrice)) .get(); }
一切順理成章,運行測試下:
05:24:53.759[main|1]獲取某寶上 iphone13的價格 05:24:54.779[main|1]獲取某寶上 Iphone13的價格完成: 5199 05:24:54.779[main|1]獲取某寶上 Iphone13的優惠 05:24:55.781[main|1]獲取某寶上 Iphone13的優惠完成: -200 05:24:55.781[main|1]某寶最終價格計算完成:4999 05:24:55.781[main|1]獲取某東上 Iphone13的價格 05:24:56.784[main|1]獲取某東上 Iphone13的價格完成: 5299 05:24:56.784[main|1]獲取某東上 Iphone13的優惠 05:24:57.786[main|1]獲取某東上 Iphone13的優惠完成: -150 05:24:57.786[main|1]某東最終價格計算完成:5149 05:24:57.786[main|1]獲取某夕夕上 Iphone13的價格 05:24:58.788[main|1]獲取某夕夕上 Iphone13的價格完成: 5399 05:24:58.788[main|1]獲取某夕夕上 Iphone13的優惠 05:24:59.791[main|1]獲取某夕夕上 Iphone13的優惠完成: -5300 05:24:59.791[main|1]某夕夕最終價格計算完成:99 獲取最優價格信息:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 6122ms ------
結果符合預期,功能一切正常,就是耗時長了點。試想一下,假如你在某個App操作查詢的時候,等待6s才返回結果,估計會直接把APP給卸載了吧?
梳理下前面代碼的實現思路:
所有的環節都是串行的,每個環節耗時加到一起,接口總耗時肯定很長。
但實際上,每個平臺之間的操作是互不干擾的,那我們自然而然地可以想到,可以通過多線程的方式,同時去分別執行各個平臺的邏輯處理,最后將各個平臺的結果匯總到一起比對得到最低價格。
所以整個執行過程會變成如下的效果:
為了提升性能,我們采用線程池來負責多線程的處理操作,因為我們需要得到各個子線程處理的結果,所以我們需要使用 Future來實現:
public PriceResult getCheapestPlatAndPrice2(String product) { Future mouBaoFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouBaoPrice(product), HttpRequestMock.getMouBaoDiscounts(product))); Future mouDongFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouDongPrice(product), HttpRequestMock.getMouDongDiscounts(product))); Future mouXiXiFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouXiXiPrice(product), HttpRequestMock.getMouXiXiDiscounts(product))); // 等待所有線程結果都處理完成,然后從結果中計算出最低價 return Stream.of(mouBaoFuture, mouDongFuture, mouXiXiFuture) .map(priceResultFuture -> { try { return priceResultFuture.get(5L, TimeUnit.SECONDS); } catch (Exception e) { return null; } }) .filter(Objects::nonNull) .min(Comparator.comparingInt(PriceResult::getRealPrice)) .get(); }
上述代碼中,將三個不同平臺對應的Callable函數邏輯放入到ThreadPool中去執行,返回Future對象,然后再逐個通過Future.get()接口阻塞獲取各自平臺的結果,最后經比較處理后返回最低價信息。
執行代碼,可以看到執行結果與過程如下:
05:42:24.270[pool-1-thread-1|12]獲取某寶上 Iphone13的價格 05:42:24.270[pool-1-thread-2|13]獲取某東上 Iphone13的價格 05:42:24.270[pool-1-thread-3|14]獲取某夕夕上 Iphone13的價格 05:42:25.291[pool-1-thread-2|13]獲取某東上 Iphone13的價格完成: 5299 05:42:25.291[pool-1-thread-3|14]獲取某夕夕上 Iphone13的價格完成: 5399 05:42:25.291[pool-1-thread-1|12]獲取某寶上 Iphone13的價格完成: 5199 05:42:25.291[pool-1-thread-2|13]獲取某東上 Iphone13的優惠 05:42:25.291[pool-1-thread-3|14]獲取某夕夕上 Iphone13的優惠 05:42:25.291[pool-1-thread-1|12]獲取某寶上 Iphone13的優惠 05:42:26.294[pool-1-thread-2|13]獲取某東上 Iphone13的優惠完成: -150 05:42:26.294[pool-1-thread-3|14]獲取某夕夕上 Iphone13的優惠完成: -5300 05:42:26.294[pool-1-thread-1|12]獲取某寶上 Iphone13的優惠完成: -200 05:42:26.294[pool-1-thread-2|13]某東最終價格計算完成:5149 05:42:26.294[pool-1-thread-3|14]某夕夕最終價格計算完成:99 05:42:26.294[pool-1-thread-1|12]某寶最終價格計算完成:4999 獲取最優價格信息:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 2119ms ------
結果與第一種實現方式一致,但是接口總耗時從6s下降到了2s,效果還是很顯著的。但是,是否還能再壓縮一些呢?
基于上面按照平臺拆分并行處理的思路繼續推進,我們可以看出每個平臺內的處理邏輯其實可以分為3個主要步驟:
- 獲取原始價格(耗時操作)
- 獲取折扣優惠(耗時操作)
- 得到原始價格和折扣優惠之后,計算實付價格
這3個步驟中,第1、2兩個耗時操作也是相對獨立的,如果也能并行處理的話,響應時長上應該又會縮短一些,即如下的處理流程:
我們當然可以繼續使用上面提到的線程池+Future的方式,但Future在應對并行結果組合以及后續處理等方面顯得力不從心,弊端明顯:
代碼寫起來會非常拖沓:先封裝Callable函數放到線程池中去執行查詢操作,然后分三組阻塞等待結果并計算出各自結果,最后再阻塞等待價格計算完成后匯總得到最終結果。
說到這里呢,就需要我們新的主人公CompletableFuture登場了,通過它我們可以很輕松的來完成任務的并行處理,以及各個并行任務結果之間的組合再處理等操作。我們使用CompletableFuture編寫實現代碼如下:
public PriceResult getCheapestPlatAndPrice3(String product) { // 獲取并計算某寶的最終價格 CompletableFuture mouBao = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)) .thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice); // 獲取并計算某寶的最終價格 CompletableFuture mouDong = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongPrice(product)) .thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongDiscounts(product)), this::computeRealPrice); // 獲取并計算某寶的最終價格 CompletableFuture mouXiXi = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)) .thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)), this::computeRealPrice); // 排序并獲取最低價格 return Stream.of(mouBao, mouDong, mouXiXi) .map(CompletableFuture::join) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }
看下執行結果符合預期,而接口耗時則降到了1s(因為我們依賴的每一個查詢實際操作的接口耗時都是模擬的1s,所以這個結果已經算是此復合接口能達到的極限值了)。
06:01:12.334[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13的價格 06:01:12.334[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13的優惠 06:01:12.334[ForkJoinPool.commonPool-worker-11|14]獲取某東上 Iphone13的價格 06:01:12.334[ForkJoinPool.commonPool-worker-13|16]獲取某夕夕上 Iphone13的價格 06:01:12.334[ForkJoinPool.commonPool-worker-4|15]獲取某東上 Iphone13的優惠 06:01:12.334[ForkJoinPool.commonPool-worker-6|17]獲取某夕夕上 Iphone13的優惠 06:01:13.354[ForkJoinPool.commonPool-worker-6|17]獲取某夕夕上 Iphone13的優惠完成: -5300 06:01:13.354[ForkJoinPool.commonPool-worker-13|16]獲取某夕夕上 Iphone13的價格完成: 5399 06:01:13.354[ForkJoinPool.commonPool-worker-4|15]獲取某東上 Iphone13的優惠完成: -150 06:01:13.354[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13的價格完成: 5199 06:01:13.354[ForkJoinPool.commonPool-worker-11|14]獲取某東上 Iphone13的價格完成: 5299 06:01:13.354[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13的優惠完成: -200 06:01:13.354[ForkJoinPool.commonPool-worker-13|16]某夕夕最終價格計算完成:99 06:01:13.354[ForkJoinPool.commonPool-worker-11|14]某東最終價格計算完成:5149 06:01:13.354[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 獲取最優價格信息:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 1095ms ------
好啦,通過餐前的前菜,大家應該能夠看出來串行與并行處理邏輯的區別、以及并行處理邏輯的實現策略了吧?這里我們應該也可以看出CompletableFuture在應對并行處理場景下的強大優勢。當然咯,上面也只是小小的窺視了下CompletableFuture功能的冰上一角,下面就讓我們一起來深入了解下,享用并消化CompletableFuture這道主菜吧!
主菜:CompletableFuture深入了解
好啦,下面該主菜上場了。
作為JAVA8之后加入的新成員,CompletableFuture的實現與使用上,也處處體現出了函數式異步編程的味道。一個CompletableFuture對象可以被一個環節接一個環節的處理、也可以對兩個或者多個CompletableFuture進行組合處理或者等待結果完成。通過對CompletableFuture各種方法的合理使用與組合搭配,可以讓我們在很多的場景都可以應付自如。
下面就來一起了解下這些方法以及對應的使用方式吧。
Future與CompletableFuture
首先,先來理一下Future與CompletableFuture之間的關系。
Future
如果接觸過多線程相關的概念,那Future應該不會陌生,早在Java5中就已經存在了。
該如何理解Future呢?舉個生活中的例子:
你去咖啡店點了一杯咖啡,然后服務員會給你一個訂單小票。 當服務員在后臺制作咖啡的時候,你并沒有在店里等待,而是出門到隔壁甜品店又買了個面包。 當面包買好之后,你回到咖啡店,拿著訂單小票去取咖啡。 取到咖啡后,你邊喝咖啡邊把面包吃了……嗝~
是不是很熟悉的生活場景? 對比到我們多線程異步編程的場景中,咖啡店的訂單小票其實就是Future,通過Future可以讓稍后適當的時候可以獲取到對應的異步執行線程中的執行結果。
上面的場景,我們翻譯為代碼實現邏輯:
public void buycoffeeAndOthers() throws ExecutionException, InterruptedException { goShopping(); // 子線程中去處理做咖啡這件事,返回future對象 Future coffeeTicket = threadPool.submit(this::makeCoffee); // 主線程同步去做其他的事情 Bread bread = buySomeBread(); // 主線程其他事情并行處理完成,阻塞等待獲取子線程執行結果 Coffee coffee = coffeeTicket.get(); // 子線程結果獲取完成,主線程繼續執行 eatAndDrink(bread, coffee); }
編碼源于生活、代碼中的設計邏輯,很多時候都是與生活哲學匹配的。
CompletableFuture
Future在應對一些簡單且相互獨立的異步執行場景很便捷,但是在一些復雜的場景,比如同時需要多個有依賴關系的異步獨立處理的時候,或者是一些類似流水線的異步處理場景時,就顯得力不從心了。比如:
- 同時執行多個并行任務,等待最快的一個完成之后就可以繼續往后處理
- 多個異步任務,每個異步任務都需要依賴前一個異步任務執行的結果再去執行下一個異步任務,最后只需要一個最終的結果
- 等待多個異步任務全部執行完成后觸發下一個動作執行
所以呢, 在JAVA8開始引入了全新的CompletableFuture類,它是Future接口的一個實現類。也就是在Future接口的基礎上,額外封裝提供了一些執行方法,用來解決Future使用場景中的一些不足,對流水線處理能力提供了支持。
下一節中,我們就來進一步的了解下CompletableFuture的具體使用場景與使用方式。
CompletableFuture使用方式創建CompletableFuture并執行
當我們需要進行異步處理的時候,我們可以通過CompletableFuture.supplyAsync方法,傳入一個具體的要執行的處理邏輯函數,這樣就輕松的完成了CompletableFuture的創建與觸發執行。
方法名稱
作用描述
supplyAsync
靜態方法,用于構建一個CompletableFuture對象,并異步執行傳入的函數,允許執行函數有返回值T。
runAsync
靜態方法,用于構建一個CompletableFuture對象,并異步執行傳入函數,與supplyAsync的區別在于此方法傳入的是Callable類型,僅執行,沒有返回值。
使用示例:
public void testCreateFuture(String product) { // supplyAsync, 執行邏輯有返回值PriceResult CompletableFuture supplyAsyncResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)); // runAsync, 執行邏輯沒有返回值 CompletableFuture runAsyncResult = CompletableFuture.runAsync(() -> System.out.println(product)); }
特別補充:
supplyAsync或者runAsync創建后便會立即執行,無需手動調用觸發。
環環相扣處理
在流水線處理場景中,往往都是一個任務環節處理完成后,下一個任務環節接著上一環節處理結果繼續處理。CompletableFuture用于這種流水線環節驅動類的方法有很多,相互之間主要是在返回值或者給到下一環節的入參上有些許差異,使用時需要注意區分:
具體的方法的描述歸納如下:
方法名稱
作用描述
thenApply
對CompletableFuture的執行后的具體結果進行追加處理,并將當前的CompletableFuture泛型對象更改為處理后新的對象類型,返回當前CompletableFuture對象。
thenCompose
與thenApply類似。區別點在于:此方法的入參函數返回一個CompletableFuture類型對象。
thenAccept
與thenApply方法類似,區別點在于thenAccept返回void類型,沒有具體結果輸出,適合無需返回值的場景。
thenRun
與thenAccept類似,區別點在于thenAccept可以將前面CompletableFuture執行的實際結果作為入參進行傳入并使用,但是thenRun方法沒有任何入參,只能執行一個Runnable函數,并且返回void類型。
因為上述thenApply、thenCompose方法的輸出仍然都是一個CompletableFuture對象,所以各個方法是可以一環接一環的進行調用,形成流水線式的處理邏輯:
期望總是美好的,但是實際情況卻總不盡如人意。在我們編排流水線的時候,如果某一個環節執行拋出異常了,會導致整個流水線后續的環節就沒法再繼續下去了,比如下面的例子:
public void testExceptionHandle() { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("supplyAsync excetion occurred..."); }).thenApply(obj -> { System.out.println("thenApply executed..."); return obj; }).join(); }
執行之后會發現,supplyAsync拋出異常后,后面的thenApply并沒有被執行。
那如果我們想要讓流水線的每個環節處理失敗之后都能讓流水線繼續往下面環節處理,讓后續環節可以拿到前面環節的結果或者是拋出的異常并進行對應的應對處理,就需要用到handle和whenCompletable方法了。
先看下兩個方法的作用描述:
方法名稱
作用描述
handle
與thenApply類似,區別點在于handle執行函數的入參有兩個,一個是CompletableFuture執行的實際結果,一個是是Throwable對象,這樣如果前面執行出現異常的時候,可以通過handle獲取到異常并進行處理。
whenComplete
與handle類似,區別點在于whenComplete執行后無返回值。
我們對上面一段代碼示例修改使用handle方法來處理:
public void testExceptionHandle() { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("supplyAsync excetion occurred..."); }).handle((obj, e) -> { if (e != null) { System.out.println("thenApply executed, exception occurred..."); } return obj; }).join(); }
再執行可以發現,即使前面環節出現異常,后面環節也可以繼續處理,且可以拿到前一環節拋出的異常信息:
thenApply executed, exception occurred...
多個CompletableFuture組合操作
前面一直在介紹流水線式的處理場景,但是很多時候,流水線處理場景也不會是一個鏈路順序往下走的情況,很多時候為了提升并行效率,一些沒有依賴的環節我們會讓他們同時去執行,然后在某些環節需要依賴的時候,進行結果的依賴合并處理,類似如下圖的效果。
CompletableFuture相比于Future的一大優勢,就是可以方便的實現多個并行環節的合并處理。相關涉及方法介紹歸納如下:
方法名稱
作用描述
thenCombine
將兩個CompletableFuture對象組合起來進行下一步處理,可以拿到兩個執行結果,并傳給自己的執行函數進行下一步處理,最后返回一個新的CompletableFuture對象。
thenAcceptBoth
與thenCombine類似,區別點在于thenAcceptBoth傳入的執行函數沒有返回值,即thenAcceptBoth返回值為CompletableFuture。
runAfterBoth
等待兩個CompletableFuture都執行完成后再執行某個Runnable對象,再執行下一個的邏輯,類似thenRun。
applyToEither
兩個CompletableFuture中任意一個完成的時候,繼續執行后面給定的新的函數處理。再執行后面給定函數的邏輯,類似thenApply。
acceptEither
兩個CompletableFuture中任意一個完成的時候,繼續執行后面給定的新的函數處理。再執行后面給定函數的邏輯,類似thenAccept。
runAfterEither
等待兩個CompletableFuture中任意一個執行完成后再執行某個Runnable對象,可以理解為thenRun的升級版,注意與runAfterBoth對比理解。
allOf
靜態方法,阻塞等待所有給定的CompletableFuture執行結束后,返回一個CompletableFuture結果。
anyOf
靜態方法,阻塞等待任意一個給定的CompletableFuture對象執行結束后,返回一個CompletableFuture結果。
結果等待與獲取
在執行線程中將任務放到工作線程中進行處理的時候,執行線程與工作線程之間是異步執行的模式,如果執行線程需要獲取到共工作線程的執行結果,則可以通過get或者join方法,阻塞等待并從CompletableFuture中獲取對應的值。
對get和join的方法功能含義說明歸納如下:
方法名稱
作用描述
get()
等待CompletableFuture執行完成并獲取其具體執行結果,可能會拋出異常,需要代碼調用的地方手動try...catch進行處理。
get(long, TimeUnit)
與get()相同,只是允許設定阻塞等待超時時間,如果等待超過設定時間,則會拋出異常終止阻塞等待。
join()
等待CompletableFuture執行完成并獲取其具體執行結果,可能會拋出運行時異常,無需代碼調用的地方手動try...catch進行處理。
從介紹上可以看出,兩者的區別就在于是否需要調用方顯式的進行try...catch處理邏輯,使用代碼示例如下:
public void testGetAndJoin(String product) { // join無需顯式try...catch... PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)) .join(); try { // get顯式try...catch... PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)) .get(5L, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } }
CompletableFuture方法及其Async版本
我們在使用CompletableFuture的時候會發現,有很多的方法,都會同時有兩個以Async命名結尾的方法版本。以前面我們用的比較多的thenCombine方法為例:
- thenCombine(CompletionStage, BiFunction)
- thenCombineAsync(CompletionStage, BiFunction)
- thenCombineAsync(CompletionStage, BiFunction, Executor)
從參數上看,區別并不大,僅第三個方法入參中多了線程池Executor對象。看下三個方法的源碼實現,會發現其整體實現邏輯都是一致的,僅僅是使用線程池這個地方的邏輯有一點點的差異:
有興趣的可以去翻一下此部分的源碼實現,這里概括下三者的區別:
- thenCombine方法,沿用上一個執行任務所使用的線程池進行處理
- thenCombineAsync兩個入參的方法,使用默認的ForkJoinPool線程池中的工作線程進行處理
- themCombineAsync三個入參的方法,支持自定義線程池并指定使用自定義線程池中的線程作為工作線程去處理待執行任務。
為了更好的理解下上述的三個差異點,我們通過下面的代碼來演示下:
- **用法1: **其中一個supplyAsync方法以及thenCombineAsync指定使用自定義線程池,另一個supplyAsync方法不指定線程池(使用默認線程池)
public PriceResult getCheapestPlatAndPrice4(String product) { // 構造自定義線程池 ExecutorService executor = Executors.newFixedThreadPool(5); return CompletableFuture.supplyAsync( () -> HttpRequestMock.getMouXiXiPrice(product), executor ).thenCombineAsync( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)), this::computeRealPrice, executor ).join(); }
對上述代碼實現策略的解讀,以及與執行結果的關系展示如下圖所示,可以看出,沒有指定自定義線程池的supplyAsync方法,其使用了默認的ForkJoinPool工作線程來運行,而另外兩個指定了自定義線程池的方法,則使用了自定義線程池來執行。
- 用法2: 不指定自定義線程池,使用默認線程池策略,使用thenCombine方法
public PriceResult getCheapestPlatAndPrice5(String product) { return CompletableFuture.supplyAsync( () -> HttpRequestMock.getMouXiXiPrice(product) ).thenCombine( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)), this::computeRealPrice ).join(); }
執行結果如下,可以看到執行線程名稱與用法1示例相比發生了變化。因為沒有指定線程池,所以兩個supplyAsync方法都是用的默認的ForkJoinPool線程池,而thenCombine使用的是上一個任務所使用的線程池,所以也是用的ForkJoinPool。
14:34:27.815[ForkJoinPool.commonPool-worker-1|12]獲取某夕夕上 Iphone13的價格 14:34:27.815[ForkJoinPool.commonPool-worker-2|13]獲取某夕夕上 Iphone13的優惠 14:34:28.831[ForkJoinPool.commonPool-worker-2|13]獲取某夕夕上 Iphone13的優惠完成: -5300 14:34:28.831[ForkJoinPool.commonPool-worker-1|12]獲取某夕夕上 Iphone13的價格完成: 5399 14:34:28.831[ForkJoinPool.commonPool-worker-2|13]某夕夕最終價格計算完成:99 獲取最優價格信息:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 1083ms ------
現在,我們知道了方法名稱帶有Async和不帶Async的實現策略上的差異點就在于使用哪個線程池來執行而已。那么,對我們實際的指導意義是啥呢?實際使用的時候,我們怎么判斷自己應該使用帶Async結尾的方法、還是不帶Async結尾的方法呢?
上面是Async結尾方法默認使用的ForkJoinPool創建的邏輯,這里可以看出,默認的線程池中的工作線程數是CPU核數 - 1,并且指定了默認的丟棄策略等,這就是一個主要關鍵點。
所以說,符合以下幾個條件的時候,可以考慮使用帶有Async后綴的方法,指定自定義線程池:
- 默認線程池的線程數滿足不了實際訴求
- 默認線程池的類型不符合自己業務訴求
- 默認線程池的隊列滿處理策略不滿足自己訴求
與Stream結合使用的注意點
在我前面的文檔中,有細致全面的介紹過Stream流相關的使用方式(不清楚的同學速點《吃透JAVA的Stream流操作,多年實踐總結》了解下啦)。在涉及批量進行并行處理的時候,通過Stream與CompletableFuture結合使用,可以簡化我們的很多編碼邏輯。但是在使用細節方面需要注意下,避免達不到使用CompletableFuture的預期效果。
需求場景: 在同一個平臺內,傳入多個商品,查詢不同商品對應的價格與優惠信息,并選出實付價格最低的商品信息。
結合前面的介紹分析,我們應該知道最佳的方式,就是同時并行的方式去各自請求數據,最后合并處理即可。所以我們規劃按照如下的策略來實現:
先看第一種編碼實現:
public PriceResult comparePriceInOnePlat(List products) { return products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)) .thenCombine( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice)) .map(CompletableFuture::join) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }
對于List的處理場景,這里采用了Stream方式來進行遍歷與結果的收集、排序與返回。看似正常,但是執行的時候會發現,并沒有達到我們預期的效果:
07:37:14.388[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格 07:37:14.388[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠 07:37:15.408[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格完成: 5199 07:37:15.408[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠完成: -200 07:37:15.408[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 07:37:15.408[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13白色的價格 07:37:15.409[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的優惠 07:37:16.410[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13白色的價格完成: 5199 07:37:16.410[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的優惠完成: -200 07:37:16.410[ForkJoinPool.commonPool-worker-11|14]某寶最終價格計算完成:4999 07:37:16.410[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13紅色的優惠 07:37:16.410[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13紅色的價格 07:37:17.412[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13紅色的價格完成: 5199 07:37:17.412[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13紅色的優惠完成: -200 07:37:17.412[ForkJoinPool.commonPool-worker-9|12]某寶最終價格計算完成:4999 獲取最優價格信息:【平臺:某寶, 原價:5199, 折扣:0, 實付價:4999】 -----執行耗時: 3132ms ------
從上述執行結果可以看出,其具體處理的時候,其實是按照下面的邏輯去處理了:
為什么會出現這種實際與預期的差異呢?原因就在于我們使用的Stream上面!雖然Stream中使用兩個map方法,但Stream處理的時候并不會分別遍歷兩遍,其實寫法等同于下面這種寫到1個map中處理,改為下面這種寫法,其實大家也就更容易明白為啥會沒有達到我們預期的整體并行效果了:
public PriceResult comparePriceInOnePlat1(List products) { return products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)) .thenCombine( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice) .join()) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }
既然如此,這種場景是不是就不能使用Stream了呢?也不是,其實我們拆開成兩個Stream分步操作下其實就可以了。
再看下面的第二種實現代碼:
public PriceResult comparePriceInOnePlat2(List products) { // 先觸發各自平臺的并行處理 List> completableFutures = products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)) .thenCombine( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice)) .collect(Collectors.toList()); // 在獨立的流中,等待所有并行處理結束,做最終結果處理 return completableFutures.stream() .map(CompletableFuture::join) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }
執行結果:
07:39:15.053[ForkJoinPool.commonPool-worker-13|16]獲取某寶上 Iphone13紅色的優惠 07:39:15.054[ForkJoinPool.commonPool-worker-4|15]獲取某寶上 Iphone13白色的優惠 07:39:15.053[ForkJoinPool.commonPool-worker-6|17]獲取某寶上 Iphone13紅色的價格 07:39:15.053[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格 07:39:15.053[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的價格 07:39:15.053[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠 07:39:16.072[ForkJoinPool.commonPool-worker-6|17]獲取某寶上 Iphone13紅色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-4|15]獲取某寶上 Iphone13白色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-13|16]獲取某寶上 Iphone13紅色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 07:39:16.072[ForkJoinPool.commonPool-worker-4|15]某寶最終價格計算完成:4999 07:39:16.072[ForkJoinPool.commonPool-worker-13|16]某寶最終價格計算完成:4999 獲取最優價格信息:【平臺:某寶, 原價:5199, 折扣:0, 實付價:4999】 -----執行耗時: 1142ms ------
從執行結果可以看出,三個商品并行處理,整體處理耗時相比前面編碼方式有很大提升,達到了預期的效果。
歸納下:
因為Stream的操作具有延遲執行的特點,且只有遇到終止操作(比如collect方法)的時候才會真正的執行。所以遇到這種需要并行處理且需要合并多個并行處理流程的情況下,需要將并行流程與合并邏輯放到兩個Stream中,這樣分別觸發完成各自的處理邏輯,就可以了。
甜點:并發和并行的區別
對一個吃貨而言,主餐完畢,總得來點餐后甜點才夠滿足。
在前面的內容中呢,我們始終是在圍繞并行處理這個話題在展開。實際工作的時候,我們對于并發這個詞肯定也不陌生,高并發這個詞,就像高端人士酒杯中那八二年的拉菲一般,成了每一個開發人員簡歷上用來彰顯實力的一個標簽。
那么,并發和并行到底啥區別?這里我們也簡單的概括下。
并發
所謂并發,其關注的點是服務器的吞吐量情況,也就是服務器可以在單位時間內同時處理多少個請求。并發是通過多線程的方式來實現的,充分利用當前CPU多核能力,同時使用多個進程去處理業務,使得同一個機器在相同時間內可以處理更多的請求,提升吞吐量。
所有的操作在一個線程中串行推進,如果有多個線程同步處理,則同時有多個請求可以被處理。但是因為是串行處理,所以如果某個環節需要對外交互時,比如等待網絡IO的操作,會使得當前線程處于阻塞狀態,直到資源可用時被喚醒繼續往后執行。
對于高并發場景,服務器的線程資源是非常寶貴的。如果頻繁的處于阻塞則會導致浪費,且線程頻繁的阻塞、喚醒切換動作,也會加劇整體系統的性能損耗。所以并發這種多線程場景,更適合CPU密集型的操作。
并行
所謂并行,就是將同一個處理流程沒有相互依賴的部分放到多個線程中進行同時并行處理,以此來達到相對于串行模式更短的單流程處理耗時的效果,進而提升系統的整體響應時長與吞吐量。
基于異步編程實現的并行操作也是借助線程池的方式,通過多線程同時執行來實現效率提升的。與并發的區別在于:并行通過將任務切分為一個個可獨立處理的小任務塊,然后基于系統調度策略,將需要執行的任務塊分配給空閑可用工作線程去處理,如果出現需要等待的場景(比如IO請求)則工作線程會將此任務先放下,繼續處理后續的任務,等之前的任務IO請求好了之后,系統重新分配可用的工作線程來處理。
根據上面的示意圖介紹可以看出,異步并行編程,對于工作線程的利用率上升,不會出現工作線程阻塞的情況,但是因為任務拆分、工作線程間的切換調度等系統層面的開銷也會隨之加大。
如何選擇
前面介紹了下并發與并行兩種模式的特點、以及各自的優缺點。所以選擇采用并發還是并行方式來提升系統的處理性能,還需要結合實際項目場景來確定。
綜合而言:
- 如果業務處理邏輯是CPU密集型的操作,優先使用基于線程池實現并發處理方案(可以避免線程間切換導致的系統性能浪費)。
- 如果業務處理邏輯中存在較多需要阻塞等待的耗時場景、且相互之間沒有依賴,比如本地IO操作、網絡IO請求等等,這種情況優先選擇使用并行處理策略(可以避免寶貴的線程資源被阻塞等待)。
總結回顧
好啦,關于JAVA中CompletableFuture的使用,以及并行編程相關的內容呢就介紹到這里啦。看到這里,相信您應該有所收獲吧?那么你的項目里有這種適合并行處理的場景嗎?你在處理并行場景的時候是怎么做的呢?評論區一起討論下吧~~
補充:
本文中有提及CompletableFuture執行時所使用的默認線程池是ForkJoinPool,早在JAVA7版本就已經被引入,但是很多人對ForkJoinPool不是很了解,實際項目中使用的也比較少。其實對ForkJoinPool的合理利用,可以讓我們在面對某些多線程場景時會更加的從容高效。在后面的文章中,我會針對ForkJoinPool有關的內容進行專門的介紹與探討,如果有興趣,可以點個關注,及時獲取后續的內容。
此外:
- 關于本文中涉及的演示代碼的完整示例,我已經整理并提交到github中,如果您有需要,可以自取:https://github.com/veezean/JavaBasicSkills
我是悟道,聊技術、又不僅僅聊技術~
如果覺得有用,請點贊 + 關注讓我感受到您的支持。也可以關注下我的公眾號【架構悟道】,獲取更及時的更新。
期待與你一起探討,一起成長為更好的自己。