平時開發過程中 Runable
、Future
、 Thread
、ExecutorService
、Callable
這些和多線程相關的class了解和使用的也比較多,相對來說更加的熟悉和了解。使用起來也更加的得心應手。但是這些組合在一起解決多線程的問題的同時也有一些不是很滿足實際開發過程中的需求。然后在JDK8引入了一個新的類 CompletableFuture
來解決之前的痛點問題。接下來了解一下 CompletableFuture
的一些基本情況以及使用和注意事項。
1 CompletableFuture概述
在JDK8之前,我們使用的JAVA多線程變成,主要是 Thread+Runnable 來完成,但是這種方式有個弊端就是沒有返回值。如果想要返回值怎么辦呢,大多數人就會想到 Callable + Thread
的方式來獲取到返回值。如下:
public class TestCompletable {
public static void main(String[] args) throws Exception{
FutureTask<String> task = new FutureTask((Callable<String>) () -> {
TimeUnit.SECONDS.sleep(2);
return UUID.randomUUID().toString();
});
new Thread(task).start();
String s = task.get();
System.out.println(s);
}
}
復制代碼
從運行上面代碼可以知道當調用代碼 String s = task.get();
的時候,當前主線程是阻塞狀態,另一種方式獲取到返回值就是通過輪詢 task.isDone()
來判斷任務是否做完獲取返回值。因此JDK8之前提供的異步能力有一定的局限性:
- Runnable+Thread雖然提供了多線程的能力但是沒有返回值。
- Callable+Thread的方法提供多線程和返回值的能力但是在獲取返回值的時候會阻塞主線程。
所以上述的情況只適合不關心返回值,只要提交的Task執行了就可以。另外的就是能夠容忍等待。因此我們需要更大的異步能力為了去解決這些痛點問題。比如一下場景:
- 兩個Task計算合并為一個,這兩個異步計算之間相互獨立,但是兩者之前又有依賴關系。
- 對于多個Task,只要一個任務返回了結果就返回結果
等等其他的一些負載的場景, JDK8 就引入了 CompletableFuture
1.1 CompletableFuture與Future的關系
通過上面的類繼承關系圖可以知道 CompletableFuture
實現了 Future
接口和 CompletionStage
。因此 CompletableFuture是對 Futrue的功能增強包含了Future的功能。從繼承的另一個 CompletionStage
的名稱來看完成階段性的接口。這個怎么理解,這個就是下面要說的CompletableFuture本質。
1.2 CompletableFuture本質
CompletableFuture本質是什么?筆者的理解CompletableFuture相當于一個Task編排工具。為什么這么說依據如下:
- CompletableFuture#completedFuture、CompletableFuture#whenComplete 這些方法都是對某一個階段Task計算完成然后進行下一步的動作。將下一個一個Task和前一個Task進行編排。
- CompletableFuture#handle 將Task串連起來
這些動作其實就是Task編排。
2 CompletableFuture使用案例
下面通過自己寫的一些例子和開源項目 DLedger 中的一些例子來看一下 CompletableFuture
使用。
CompletableFuture具有Future的功能:
public class TestCompletable {
public static void main(String[] args) throws Exception{
FutureTask<String> futureTask = new FutureTask(() -> {
Thread.sleep(2000);
return UUID.randomUUID().toString();
});
new Thread(futureTask).start();
CompletableFuture<String> future = CompletableFuture.completedFuture(futureTask.get());
String uuid = future.get();
System.out.println(uuid);
}
}
復制代碼
運行代碼會發現整個過程會等待會然后打印錯結果:
Task完成后回調:
public class TestCompletable {
public static void main(String[] args) throws Exception{
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(3);
System.out.println("");
return UUID.randomUUID().toString();
} catch (InterruptedException e) {
e.printStackTrace();
}
return UUID.randomUUID().toString();
});
future.whenComplete((uuid,exception)->{
System.out.println(uuid);
System.out.println(Thread.currentThread().getName());
});
System.out.println(11111);
System.in.read();
}
}
復制代碼
運行一下看一下結果:
通過結果可以看出來當完成UUID生成后,又執行了whenComplete里面的回調方法。同時還可以通過 future.get()
獲取到返回值。或者就用上面的代碼不用get的方式。在回調函數中就能獲取到。
完成任意一個Task就開始執行回調函數:
public class TestCompletable {
public static void main(String[] args) throws Exception{
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(3);
return "開始生成UUID-"+UUID.randomUUID().toString();
} catch (InterruptedException e) {
e.printStackTrace();
}
return UUID.randomUUID().toString();
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(4);
return "開始生成UUID1-"+UUID.randomUUID().toString();
} catch (InterruptedException e) {
e.printStackTrace();
}
return UUID.randomUUID().toString();
});
CompletableFuture.anyOf(future,future1).whenComplete((uuid,ex)->{
System.out.println(uuid);
});
System.out.println(11111);
System.in.read();
}
}
復制代碼
看一下執行結果:
上面使用了只是一部分CompletableFuture
的特性。通過對Task進行編排可以做到很多的事情。
在DLedger中:
通過使用 CompletableFuture
異步化,處理請求都是通過CompletableFuture#whenCompleteAsync方法。感興趣的可以去閱讀一下源碼進一步了解**CompletableFuture
** 在實際項目中的使用。
3 CompletableFuture使用需要注意點
對于和多線程編程扯上關系,首先想到的就是當前的Task到底由那個Thread執行,使用的不好可能會有性能問題。首先根據CompletableFuture
的方法命名可以了解到:
- xxxx():表示該方法將繼續在當前執行CompletableFuture的方法線程中執行
- xxxxAsync():表示異步,在線程池中執行。
用例子來說明:
public class TestCompletable {
public static void main(String[] args) throws Exception{
CompletableFuture future = new CompletableFuture();
future.whenComplete((item,ex)->{
System.out.println(item);
System.out.println(Thread.currentThread().getName());
});
future.complete(1111);
TimeUnit.SECONDS.sleep(2);
}
}
復制代碼
運行結果:
public class TestCompletable {
public static void main(String[] args) throws Exception{
CompletableFuture future = new CompletableFuture();
future.whenCompleteAsync((item,ex)->{
System.out.println(item);
System.out.println(Thread.currentThread().getName());
});
future.whenCompleteAsync((item,ex)->{
System.out.println(item);
System.out.println(Thread.currentThread().getName());
}, Executors.newFixedThreadPool(10, new ThreadFactory() {
private AtomicInteger integer = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Thread-"+integer.getAndIncrement());
}
}));
future.complete(1111);
TimeUnit.SECONDS.sleep(2);
}
}
復制代碼
運行結果:
Tips: 在沒有指定線程池的情況下,使用的是CompletableFuture內部的線程池。
對于性能有考慮的需要注意同步和異步的使用。
4 總結
CompletableFuture
可以指定異步處理流程:
thenAccept()
處理正常結果;exceptional()
處理異常結果;thenApplyAsync()
用于串行化另一個CompletableFuture
anyOf()
和allOf()
用于并行化多個CompletableFuture
在CompletableFuture
執行Task的時候,是需要使用線程池還是用當前的線程去執行。這個需要根據具體的情況來定。使用的時候要盡可能的小心。
作者:螞蟻背大象
鏈接:https://juejin.cn/post/7126145088299728933
來源:稀土掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。