姿勢1:ThreadLocal+TaskDecorator
用戶工具類
/**
*使用ThreadLocal存儲共享的數據變量,如登錄的用戶信息
*/
public class UserUtils {
private static final ThreadLocal<String> userLocal=new ThreadLocal<>();
public static String getUserId(){
return userLocal.get();
}
public static void setUserId(String userId){
userLocal.set(userId);
}
public static void clear(){
userLocal.remove();
}
}
復制代碼
自定義CustomTaskDecorator
/**
* 線程池修飾類
*/
public class CustomTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請求信息(我們的用戶信息也放在里面)
String robotId = UserUtils.getUserId();
System.out.println(robotId);
return () -> {
try {
// 將主線程的請求信息,設置到子線程中
UserUtils.setUserId(robotId);
// 執行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結束,清空這些信息,否則可能造成內存泄漏
UserUtils.clear();
}
};
}
}
復制代碼
ExecutorConfig
在原來的基礎上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor----------------");
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//使用可視化運行狀態的線程池
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數
executor.setCorePoolSize(corePoolSize);
//配置最大線程數
executor.setMaxPoolSize(maxPoolSize);
//配置隊列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當pool已經達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加線程池修飾類
executor.setTaskDecorator(new CustomTaskDecorator());
//增加MDC的線程池修飾類
//executor.setTaskDecorator(new MDCTaskDecorator());
//執行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
復制代碼
AsyncServiceImpl
/**
* 使用ThreadLocal方式傳遞
* 帶有返回值
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync2() throws InterruptedException {
log.info("start executeValueAsync");
System.out.println("異步線程執行返回結果......+");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(UserUtils.getUserId());
}
復制代碼
Test2Controller
/**
* 使用ThreadLocal+TaskDecorator的方式
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMApping("/test2")
public String test2() throws InterruptedException, ExecutionException {
UserUtils.setUserId("123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync2();
String s = completableFuture.get();
return s;
}
復制代碼
姿勢2:RequestContextHolder+TaskDecorator
自定義CustomTaskDecorator
/**
* 線程池修飾類
*/
public class CustomTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請求信息(我們的用戶信息也放在里面)
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
return () -> {
try {
// 將主線程的請求信息,設置到子線程中
RequestContextHolder.setRequestAttributes(attributes);
// 執行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結束,清空這些信息,否則可能造成內存泄漏
RequestContextHolder.resetRequestAttributes();
}
};
}
}
復制代碼
ExecutorConfig
在原來的基礎上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor----------------");
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//使用可視化運行狀態的線程池
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數
executor.setCorePoolSize(corePoolSize);
//配置最大線程數
executor.setMaxPoolSize(maxPoolSize);
//配置隊列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當pool已經達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加線程池修飾類
executor.setTaskDecorator(new CustomTaskDecorator());
//增加MDC的線程池修飾類
//executor.setTaskDecorator(new MDCTaskDecorator());
//執行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
復制代碼
AsyncServiceImpl
/**
* 使用RequestAttributes獲取主線程傳遞的數據
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync3() throws InterruptedException {
log.info("start executeValueAsync");
System.out.println("異步線程執行返回結果......+");
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
Object userId = attributes.getAttribute("userId", 0);
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(userId.toString());
}
復制代碼
Test2Controller
/**
* RequestContextHolder+TaskDecorator的方式
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test3")
public String test3() throws InterruptedException, ExecutionException {
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
attributes.setAttribute("userId","123456",0);
CompletableFuture<String> completableFuture = asyncService.executeValueAsync3();
String s = completableFuture.get();
return s;
}
復制代碼
姿勢3:MDC+TaskDecorator
自定義MDCTaskDecorator
/**
* 線程池修飾類
*/
public class MDCTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 獲取主線程中的請求信息(我們的用戶信息也放在里面)
String userId = MDC.get("userId");
Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
System.out.println(copyOfContextMap);
return () -> {
try {
// 將主線程的請求信息,設置到子線程中
MDC.put("userId",userId);
// 執行子線程,這一步不要忘了
runnable.run();
} finally {
// 線程結束,清空這些信息,否則可能造成內存泄漏
MDC.clear();
}
};
}
}
復制代碼
ExecutorConfig
在原來的基礎上增加 executor.setTaskDecorator(new MDCTaskDecorator());
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor----------------");
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//使用可視化運行狀態的線程池
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數
executor.setCorePoolSize(corePoolSize);
//配置最大線程數
executor.setMaxPoolSize(maxPoolSize);
//配置隊列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當pool已經達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加MDC的線程池修飾類
executor.setTaskDecorator(new MDCTaskDecorator());
//執行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
復制代碼
AsyncServiceImpl
/**
* 使用MDC獲取主線程傳遞的數據
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync5() throws InterruptedException {
log.info("start executeValueAsync");
System.out.println("異步線程執行返回結果......+");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(MDC.get("userId"));
}
復制代碼
Test2Controller
/**
* 使用MDC+TaskDecorator方式
* 本質也是ThreadLocal+TaskDecorator方式
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test5")
public String test5() throws InterruptedException, ExecutionException {
MDC.put("userId","123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync5();
String s = completableFuture.get();
return s;
}
復制代碼
姿勢4:InheritableThreadLocal
用戶工具類 UserInheritableUtils
//**
*使用InheritableThreadLocal存儲線程之間共享的數據變量,如登錄的用戶信息
*/
public class UserInheritableUtils {
private static final InheritableThreadLocal<String> userLocal=new InheritableThreadLocal<>();
public static String getUserId(){
return userLocal.get();
}
public static void setUserId(String userId){
userLocal.set(userId);
}
public static void clear(){
userLocal.remove();
}
}
復制代碼
AsyncServiceImpl
/**
* 使用InheritableThreadLocal獲取主線程傳遞的數據
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync4() throws InterruptedException {
log.info("start executeValueAsync");
System.out.println("異步線程執行返回結果......+");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(UserInheritableUtils.getUserId());
}
復制代碼
Test2Controller
/**
* 使用InheritableThreadLocal方式
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test4")
public String test4(@RequestParam("userId") String userId) throws InterruptedException, ExecutionException {
UserInheritableUtils.setUserId(userId);
CompletableFuture<String> completableFuture = asyncService.executeValueAsync4();
String s = completableFuture.get();
return s;
}
復制代碼
姿勢5:TransmittableThreadLocal
用戶工具類 UserTransmittableUtils
/**
*使用TransmittableThreadLocal存儲線程之間共享的數據變量,如登錄的用戶信息
*/
public class UserTransmittableUtils {
private static final TransmittableThreadLocal<String> userLocal=new TransmittableThreadLocal<>();
public static String getUserId(){
return userLocal.get();
}
public static void setUserId(String userId){
userLocal.set(userId);
}
public static void clear(){
userLocal.remove();
}
}
}
復制代碼
AsyncServiceImpl
/**
* 使用TransmittableThreadLocal獲取主線程傳遞的數據
* @return
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync6() throws InterruptedException {
log.info("start executeValueAsync");
System.out.println("異步線程執行返回結果......+");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture(UserTransmittableUtils.getUserId());
}
復制代碼
Test2Controller
/**
* 使用TransmittableThreadLocal方式
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@GetMapping("/test6")
public String test6() throws InterruptedException, ExecutionException {
UserTransmittableUtils.setUserId("123456");
CompletableFuture<String> completableFuture = asyncService.executeValueAsync6();
String s = completableFuture.get();
return s;
}
復制代碼
maven依賴
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.12.1</version>
</dependency>
復制代碼
方案對比
方案1,方案2,方案3主要是借助TaskDecorator進行父子線程之間傳遞數據。其中MDC方案主要借鑒于MDC的日志跟蹤的思想來實現,關于MDC相關的日志跟蹤后續會學習分享
方案4和方案5使用InheritableThreadLocal和TransmittableThreadLocal來實現,其中TransmittableThreadLocal是阿里InheritableThreadLocal進行優化封裝。為什么要封裝,有興趣的可以去學習《 加強版ThreadLocal之阿里開源TransmittableThreadLocal學習 》
本人推薦使用方案5,哈哈。
簡答說一下InheritableThreadLocal
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,1,1,
TimeUnit.MINUTES,new ArrayBlockingQueue<>(1));
ThreadLocal local = new InheritableThreadLocal();
local.set(1);
executor.execute(()->{
System.out.println("打印1:"+local.get());
});
local.set(2);
System.out.println("打印2:"+local.get());
executor.execute(()->{
System.out.println("打印3:"+local.get());
});
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("打印4:"+local.get());
}
}).start();
}
復制代碼
運行結果如下
打印2:2
打印1:1
打印3:1
打印4:2
復制代碼
分析: 分析打印3為什么是1,InheritableThreadLocal的繼承性是在new Thread創建子線程時候在構造函數內把父線程內線程變量拷貝到子線程內部的。為了不在創建新線程耗費資源,我們一般會用線程池,線程池的線程會復用,那么線程中的ThreadLocal便不對了,可能是舊的,因為線程是舊的。