本教程大概目錄:
- 模擬單線程情節
- 用Callable實現 并發編程
- 用DeferedResult實現異步處理
###模擬單線程情節。
/**
* Created by Fant.J.
*/
@RestController
@Slf4j
public class AsyncController {
/**
* 單線程測試
* @return
* @throws InterruptedException
*/
@RequestMApping("/order")
public String order() throws InterruptedException {
log.info("主線程開始");
Thread.sleep(1000);
log.info("主線程返回");
return "success";
}
}
我們把線程休息一秒當作模擬處理業務所花費的時間。很明顯能看出來,這是個單線程。
nio-8080-exec-1表示主線程的線程1。
用Callable實現 并發編程
/**
* 用Callable實現異步
* @return
* @throws InterruptedException
*/
@RequestMapping("/orderAsync")
public Callable orderAsync() throws InterruptedException {
log.info("主線程開始");
Callable result = new Callable() {
@Override
public Object call() throws Exception {
log.info("副線程開始");
Thread.sleep(1000);
log.info("副線程返回");
return "success";
}
};
log.info("主線程返回");
return result;
}
我們可以看到,主線程的開始和返回(結束處理)是首先執行的,然后副線程才執行真正的業務處理。說明主線程在這里的作用是調用(喚醒)子線程,子線程處理完會返回一個Object對象,然后返回給用戶。
這樣雖然實現了并發處理,但是有一個問題,就是主線程和副線程沒有做到完全分離,畢竟是一個嵌套進去的副線程。
所以為了優化我們的實現,我在這里模擬 消息中間件 來實現主線程副線程的完全分離。
用DeferedResult實現異步處理
因為本章主要講的是并發編程原理,所以這里我們不用現成的消息隊列來搞,我們模擬一個消息隊列來處理。
MockQueue .JAVA
/**
* 模擬消息隊列 類
* Created by Fant.J.
*/
@Component
@Slf4j
public class MockQueue {
//下單消息
private String placeOrder;
//訂單完成消息
private String completeOrder;
public String getPlaceOrder() {
return placeOrder;
}
public void setPlaceOrder(String placeOrder) throws InterruptedException {
new Thread(()->{
log.info("接到下單請求"+placeOrder);
//模擬處理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//給completeOrder賦值
this.completeOrder = placeOrder;
log.info("下單請求處理完畢"+placeOrder);
}).start();
}
public String getCompleteOrder() {
return completeOrder;
}
public void setCompleteOrder(String completeOrder) {
this.completeOrder = completeOrder;
}
}
注意再setPlaceOrder(String placeOrder)方法里,我創建了一個新的線程來處理接單的操作(為什么要建立新線程,怕主線程在這掛起,此段邏輯也沒有線程安全問題,況且異步處理更快)。傳進來的參數是個 訂單號 ,經過1s的處理成功后,把訂單號傳給completeOrder 字段,說明用戶下單成功,我在下面付controller調用該方法的代碼。
//注入模擬消息隊列類
@Autowired
private MockQueue mockQueue;
@Autowired
private DeferredResultHolder deferredResultHolder;
....
@RequestMapping("/orderMockQueue")
public DeferredResult orderQueue() throws InterruptedException {
log.info("主線程開始");
//隨機生成8位數
String orderNumber = RandomStringUtils.randomNumeric(8);
mockQueue.setPlaceOrder(orderNumber);
DeferredResult result = new DeferredResult();
deferredResultHolder.getMap().put(orderNumber,result);
Thread.sleep(1000);
log.info("主線程返回");
return result;
}
好了,然后我們還需要一個中介類來存放訂單號和處理結果。為什么需要這么一個類呢,因為我們之前說過要實現主線程和副線程分離,所以需要一個中介來存放處理信息(比如:這個訂單號信息,和處理結果信息),我們判斷處理結果是否為空就知道該副線程執行了沒有。所以我們寫一個中介類DeferredResultHolder 。
######DeferredResultHolder .java:
/**
* 訂單處理情況 中介/持有者
* Created by Fant.J.
*/
@Component
public class DeferredResultHolder {
/**
* String: 訂單號
* DeferredResult:處理結果
*/
private Map<String,DeferredResult> map = new HashMap<>();
public Map<String, DeferredResult> getMap() {
return map;
}
public void setMap(Map<String, DeferredResult> map) {
this.map = map;
}
}
在重復一次-.-,為什么需要這么一個類呢,因為我們之前說過要實現主線程和副線程分離,所以需要一個中介來存放處理信息(比如:這個訂單號信息,和處理結果信息),一個訂單肯定要對應一個結果。不然豈不是亂了套。
DeferredResult是用來放處理結果的對象。
好了,那新問題又來了,我們怎么去判斷訂單處理成功了沒有,我們此時就需要寫一個監聽器,過100毫秒監聽一次MockQueue類中的completeOrder中是否有值,如果有值,那么這個訂單就需要被處理。我們寫一個監聽器。
######QueueListener .java:
/**
* Queue監聽器
* Created by Fant.J.
*/
@Component
@Slf4j
public class QueueListener implements ApplicationListener<ContextRefreshedEvent>{
@Autowired
private MockQueue mockQueue;
@Autowired
private DeferredResultHolder deferredResultHolder;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
new Thread(()->{
while(true){
//判斷CompleteOrder字段是否是空
if (StringUtils.isNotBlank(mockQueue.getCompleteOrder())){
String orderNumber = mockQueue.getCompleteOrder();
deferredResultHolder.getMap().get(orderNumber).setResult("place order success");
log.info("返回訂單處理結果");
//將CompleteOrder設為空,表示處理成功
mockQueue.setCompleteOrder(null);
}else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
我們可以看到一共有三個不同的線程來處理。
分割線后,我再給大家帶來一批干貨,自定義線程池https://www.jianshu.com/p/832f2b162450。
學完這個后,再看下面的。
我們前面的代碼中,有兩部分有用new Thread()來創建線程,我們有自己的線程池后,就可以用線程池來分配線程任務了,我在自定義線程里有講,我用的是第二種配置方法(用@Async注解來給線程 )。
修改如下:
@Async
public void setPlaceOrder(String placeOrder) throws InterruptedException {
log.info("接到下單請求"+placeOrder);
//模擬處理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//給completeOrder賦值
this.completeOrder = placeOrder;
log.info("下單請求處理完畢"+placeOrder);
}
我們看看效果:
圈紅圈的就是我們自己定義的線程池里分配的線程。