前言
在 JAVA 中你不了解異步編程,crud 完全沒(méi)有問(wèn)題,但是有的需求你無(wú)法優(yōu)雅的實(shí)現(xiàn)。
js 也存在異步編程,當(dāng)你理解了用同步的思維編寫(xiě)異步的代碼時(shí),相信你在編程上的造詣?dòng)指M(jìn)一步。
大多人都在追捧微服務(wù),可能他們只會(huì)用 Ribbon
和 Feign
。微服務(wù)是一個(gè)架構(gòu)上的選擇,當(dāng)你沒(méi)有達(dá)到架構(gòu)層次時(shí),我認(rèn)為你應(yīng)該更加注重業(yè)務(wù)上的代碼編寫(xiě),即微服務(wù)中單體服務(wù)代碼的編寫(xiě)。
單體服務(wù)性能極差,你的微服務(wù)整體性能也好不到哪里去,只能通過(guò)限流、熔斷外加多部署機(jī)器來(lái)解決并發(fā)低的問(wèn)題。在你想玩微服務(wù)之前,并發(fā)玩好了再考慮高并發(fā)。先把 java 中 juc 包下的并發(fā)相關(guān)的知識(shí)整的明明白白再進(jìn)行下一步,這花不了幾個(gè)時(shí)間。微服務(wù)是你進(jìn)階之后再學(xué)的。
本來(lái)打算繼續(xù)寫(xiě) MySQL,但實(shí)在提不起來(lái)我的興致(還需要看書(shū)研究,畢竟是個(gè)黑盒研究),只好拿這篇完成任務(wù)了。
本文內(nèi)容
- js 中 Promise 和 async await 的一個(gè)列子
- SpringBoot 中異步編程
- Future
- CompletableFuture
js 異步編程
要習(xí)慣使用 Promise ,避免把 fn 當(dāng)成參數(shù)傳遞,避免回調(diào)地獄。這不僅僅是 api 調(diào)用的問(wèn)題,這是你編程思想轉(zhuǎn)變。
const awaitFunc = function _awaitFunc() {
return Promise.resolve('awaitFunc').then(data => {
console.log(data);
return 'awaitFunc-then-return-data';
});
};
const async = async function _async() {
setTimeout(() => {
console.log('驗(yàn)證加入了宏任務(wù)隊(duì)列---1');
}, 0);
// 加不加 await 有什么區(qū)別?
await awaitFunc().then(data => {
console.log(data);
setTimeout(() => {
console.log('驗(yàn)證加入了宏任務(wù)隊(duì)列---2');
}, 0);
});
console.log('awaitFunc 執(zhí)行完在打印');
};
async();
SpringBoot 中異步編程
在 SpringBoot @EnableAsync
和 @Async
就可以助你異步編程。底層原理就是 ThreadPoolExecutor
和 Future
的封裝。
我們拿這個(gè)燒水舉例子,當(dāng)你同步串行執(zhí)行,需要消耗 20 分鐘。同步編程思維模型較簡(jiǎn)單,容易實(shí)現(xiàn)。
當(dāng)你多線(xiàn)程異步執(zhí)行,只需要消耗 16 分鐘。異步編程思維模型稍微復(fù)雜一點(diǎn),多線(xiàn)程之間通信異步轉(zhuǎn)同步是一個(gè)挑戰(zhàn)。
@GetMApping("/tea/async")
public RetUtil makeTeaAsync() throws InterruptedException, ExecutionException {
// Stopwatch 用于計(jì)算代碼執(zhí)行時(shí)間
final Stopwatch started = Stopwatch.createStarted();
final Future asyncResult = makeTeaService.boilWater();
final Future asyncResult1 = makeTeaService.washTeaCup();
asyncResult.get();
asyncResult1.get();
final long elapsed = started.elapsed(TimeUnit.SECONDS);
String str = StrUtil.format("任務(wù)執(zhí)行了 {} 秒", elapsed);
final MakeTeaVO makeTeaVO = new MakeTeaVO();
makeTeaVO.setMessage(str);
return RetUtil.success(makeTeaVO);
}
@Service
public class IMakeTeaServiceImpl implements IMakeTeaService {
@Override
@Async
public AsyncResult<String> boilWater() throws InterruptedException {
System.out.println("洗水壺");
TimeUnit.SECONDS.sleep(1);
System.out.println("燒開(kāi)水");
TimeUnit.SECONDS.sleep(15);
return new AsyncResult("洗水壺->燒開(kāi)水");
}
@Override
@Async
public AsyncResult<String> washTeaCup() throws InterruptedException {
System.out.println("洗茶杯");
System.out.println("洗茶壺");
System.out.println("拿茶葉");
TimeUnit.SECONDS.sleep(4);
return new AsyncResult("洗茶杯,洗茶壺,拿茶葉");
}
}
AsyncResult
是 Future
的實(shí)現(xiàn)類(lèi),當(dāng)調(diào)用 Future.get
會(huì)阻塞等待結(jié)果的返回。@Async
也可以指定在那個(gè)線(xiàn)程池中執(zhí)行任務(wù)。
final Future asyncResult = makeTeaService.boilWater();
final Future asyncResult1 = makeTeaService.washTeaCup();
asyncResult.get();
asyncResult1.get();
這個(gè) Demo 的實(shí)現(xiàn),需要調(diào)用兩次 Furute.get() 算是個(gè)不優(yōu)雅的實(shí)現(xiàn)。
@Override
public String makeTea() throws InterruptedException {
final CountDownLatch count = new CountDownLatch(2);
THREAD_POOL_EXECUTOR.execute(() -> {
System.out.println("洗水壺");
System.out.println("燒開(kāi)水");
try {
TimeUnit.SECONDS.sleep(16);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
count.countDown();
}
});
THREAD_POOL_EXECUTOR.execute(() -> {
System.out.println("洗茶杯");
System.out.println("洗茶壺");
System.out.println("拿茶葉");
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
count.countDown();
}
});
count.await();
System.out.println("泡茶");
return "";
}
@GetMapping("/tea/async2")
public RetUtil makeTeaAsync2() throws InterruptedException, ExecutionException {
final Stopwatch started = Stopwatch.createStarted();
makeTeaService.makeTea();
final long elapsed = started.elapsed(TimeUnit.SECONDS);
String str = StrUtil.format("任務(wù)執(zhí)行了 {} 秒", elapsed);
final MakeTeaVO makeTeaVO = new MakeTeaVO();
makeTeaVO.setMessage(str);
return RetUtil.success(makeTeaVO);
}
使用 CountDownLatch
將異步代碼轉(zhuǎn)換為同步返回,這只是另一個(gè)實(shí)現(xiàn)
Future
public interface Future<V> {
/**
* 嘗試取消這個(gè)任務(wù)的執(zhí)行.
* 如果任務(wù)執(zhí)行完成之后,調(diào)用 cancel 返回 false.
* 如果任務(wù)已經(jīng)被取消了,調(diào)用 cancel 也會(huì)返回 false
*
* 如果任務(wù)已經(jīng)執(zhí)行了, mayInterruptIfRunning 標(biāo)志是否中斷執(zhí)行任務(wù)的線(xiàn)程.
* mayInterruptIfRunning 為 true 會(huì)觸發(fā)線(xiàn)程的中斷(當(dāng)線(xiàn)程睡眠,會(huì)拋出異常 InterruptedException),
* 為 false 時(shí)不中斷任務(wù)執(zhí)行,只改變 Future 的狀態(tài)
*
* 調(diào)用了 cancel 方法,調(diào)用 get 方法會(huì)拋出異常
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 任務(wù)完成之前調(diào)用 cancel ,此方法返回 true
*/
boolean isCancelled();
/**
* 任務(wù)完成返回 true
*/
boolean isDone();
/**
* 等待任務(wù)完成,然后返回其結(jié)果
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待任務(wù)完成,然后返回其結(jié)果.超時(shí)沒(méi)有返回,拋出異常 TimeoutException
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Future.cancel(true)
會(huì)觸發(fā)線(xiàn)程休眠的中斷,即 TimeUnit.SECONDS.sleep(10);
會(huì)拋出異常。
Future.cancel(true)
或者 Future.cancel(false)
都會(huì)觸發(fā) Future.get()
異常。
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Future<String> submit = THREAD_POOL_EXECUTOR.submit(() -> {
System.out.println("任務(wù)開(kāi)始執(zhí)行");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務(wù)執(zhí)行完畢");
return "ok";
});
THREAD_POOL_EXECUTOR.execute(() -> {
System.out.println("執(zhí)行 submit.cancel");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
submit.cancel(false);
});
// submit.get();
System.out.println("整個(gè)流程執(zhí)行結(jié)束");
}
JDK 提供 Future
的實(shí)現(xiàn) FutureTask
源碼相對(duì)較簡(jiǎn)單,不再展開(kāi)。
CompletableFuture
由于 Future
使用的局限性:不能鏈?zhǔn)秸{(diào)用、多個(gè)異步計(jì)算的結(jié)果不能傳遞下一個(gè)異步任務(wù)(可以做到,但是編程稍微復(fù)雜),異步執(zhí)行異常的捕獲處理
從 JDK 1.8
開(kāi)始,大佬 Doug Lea
帶來(lái)了更加容易的異步編程模型,CompletableFuture。
CompletableFuture 可以做到
1、獲取異步執(zhí)行的結(jié)果鏈?zhǔn)絺鬟f下一個(gè)異步去執(zhí)行
2、異步執(zhí)行時(shí),你有機(jī)會(huì)處理異步執(zhí)行時(shí)發(fā)生的異常
總之,CompletableFuture
很想。
CompletableFuture
實(shí)現(xiàn)比較復(fù)雜,有的地方不是那么容易理解,當(dāng)你理解其實(shí)現(xiàn)思想,你也算是一只腳邁入了響應(yīng)式編程中去了。
開(kāi)胃小菜
public class CompletableFutureBlog1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
// 洗水壺,燒水
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("洗水壺");
System.out.println("燒水");
try {
TimeUnit.SECONDS.sleep(16);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "洗水壺 -> 燒水";
});
// 洗茶壺,洗茶杯 -> 拿茶葉
CompletableFuture<String> completableFuture2 =
CompletableFuture.supplyAsync(() -> {
System.out.println("洗茶壺");
System.out.println("洗茶杯");
System.out.println("拿茶葉");
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "洗茶壺,洗茶杯 -> 拿茶葉";
});
// 組合二者異步運(yùn)算的結(jié)果,傳遞給方法計(jì)算
final CompletableFuture<String> completableFuture = completableFuture2.thenCombine(completableFuture1, (result2, result1) -> {
System.out.println(StrUtil.format("result2 是 洗茶壺,洗茶杯 -> 拿茶葉: {}", result2));
System.out.println(StrUtil.format("result1 是 洗水壺 -> 燒水: {}", result1));
System.out.println("泡茶");
return "泡茶";
});
completableFuture.get();
System.out.println("執(zhí)行時(shí)間: " + started.elapsed(TimeUnit.SECONDS));
}
}
runAsync 和 supplyAsync 的區(qū)別
runAsync
和 supplyAsync
區(qū)別就是你是否需要獲取異步計(jì)算的結(jié)果。當(dāng)你需要異步處理的結(jié)果,你需要 supplyAsync
public class CompletableFutureBlog2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
final CompletableFuture<Integer> ret = CompletableFuture.supplyAsync(() -> {
System.out.println("開(kāi)始進(jìn)行耗時(shí)的異步計(jì)算,消耗 3 秒");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
final Integer integer = ret.get();
System.out.println(StrUtil.format("異步執(zhí)行的結(jié)果: {}", integer));
System.out.println("執(zhí)行時(shí)間: " + started.elapsed(TimeUnit.SECONDS));
}
}
thenApplyAsync
、thenAcceptAsync
和 thenRunAsync
thenXX
都是為了在上一個(gè)異步計(jì)算的結(jié)束之后執(zhí)行。
我們對(duì)異步計(jì)算的結(jié)果分為以下幾個(gè)情況:
-
需要依賴(lài)異步計(jì)算的結(jié)果,并且依賴(lài)異步計(jì)算的結(jié)果計(jì)算返回另個(gè)一個(gè)結(jié)果
thenApplyAsync
-
依賴(lài)異步計(jì)算的結(jié)果,但是不會(huì)產(chǎn)生新的結(jié)果,
thenAcceptAsync
-
不依賴(lài)計(jì)算計(jì)算的結(jié)果,并且沒(méi)有返回值
thenRunAsync
public class CompletableFutureBlog3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
final CompletableFuture<Integer> ret = CompletableFuture.supplyAsync(() -> {
System.out.println("開(kāi)始進(jìn)行耗時(shí)的異步計(jì)算,消耗 3 秒");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
final Integer result = ret.thenApplyAsync(data -> {
System.out.println("依賴(lài)上一個(gè)異步計(jì)算,消耗 5 秒");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return data + 12;
}).get();
System.out.println(StrUtil.format("異步執(zhí)行的結(jié)果: {}", result));
System.out.println("執(zhí)行時(shí)間: " + started.elapsed(TimeUnit.SECONDS));
}
}
thenCombineAsync
結(jié)合另一個(gè) CompletableFuture
異步計(jì)算,當(dāng)兩個(gè)異步計(jì)算執(zhí)行完了,執(zhí)行回調(diào)。
計(jì)算一個(gè)耗時(shí)的計(jì)算。將這個(gè)耗時(shí)計(jì)算拆成兩個(gè)耗時(shí)的異步計(jì)算,當(dāng)兩個(gè)異步計(jì)算結(jié)束,在合并最終的結(jié)果
public class CompletableFutureBlog4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
final CompletableFuture<Integer> ret1 = CompletableFuture.supplyAsync(() -> {
System.out.println("開(kāi)始進(jìn)行耗時(shí)的異步計(jì)算,消耗 3 秒");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
final CompletableFuture<Integer> ret2 = CompletableFuture.supplyAsync(() -> {
System.out.println("開(kāi)始進(jìn)行耗時(shí)的異步計(jì)算,消耗 5 秒");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
final CompletableFuture<Integer> integerCompletableFuture = ret2.thenCombineAsync(ret1, (result1, result2) -> result1 + result2);
final Integer result = integerCompletableFuture.get();
System.out.println(StrUtil.format("異步執(zhí)行的結(jié)果: {}", result));
System.out.println("執(zhí)行時(shí)間: " + started.elapsed(TimeUnit.SECONDS));
}
}
allOf
和 anyOf
可以組合多個(gè) CompletableFuture
,當(dāng)每個(gè) CompletableFuture
都執(zhí)行完,執(zhí)行后續(xù)邏輯。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
可以組合多個(gè) CompletableFuture
,當(dāng)任何一個(gè) CompletableFuture
都執(zhí)行完,執(zhí)行后續(xù)邏輯。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
future,future2,future3 執(zhí)行完之后,再執(zhí)行后續(xù)邏輯。
public class CompletableFutureBlog5 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
});
final CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
});
final CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
});
final CompletableFuture<Void> future1 = CompletableFuture.allOf(future3, future2, future);
future1.get();
System.out.println("執(zhí)行時(shí)間: " + started.elapsed(TimeUnit.SECONDS));
}
}
將上述 demo 中 allOf 替換為 anyOf,當(dāng)任一 CompletableFuture 執(zhí)行完畢,future1.get();
就會(huì)返回結(jié)果。
別的方法看參數(shù)和注釋就學(xué)會(huì)了。就不再一一列舉了。
當(dāng)使用的時(shí)候,先考慮要不要依賴(lài)異步計(jì)算的結(jié)果,要不要處理異常,要不要返回新的異步計(jì)算結(jié)果,從這幾個(gè)方面就可以知道選擇哪個(gè) api 了。
本文由 張攀欽的博客 http://www.mflyyou.cn/ 創(chuàng)作。 可自由轉(zhuǎn)載、引用,但需署名作者且注明文章出處。
如轉(zhuǎn)載至微信公眾號(hào),請(qǐng)?jiān)谖哪┨砑幼髡吖娞?hào)二維碼。微信公眾號(hào)名稱(chēng):Mflyyou