一、背景:
使用JDK線程池ThreadPoolExecutor多線程異步執行批量插入、更新等操作方法,提高百萬級數據插入效率。
二、具體細節:
2.1、創建自適應機器本身線程數量的線程池
/創建自適應機器本身線程數量的線程池
Integer processNum = Runtime.getRuntime().avAIlableProcessors();
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
2L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
@Override
public boolean batchInsert(List<Student> list) throws Exception {
Future<Boolean> a = null;
try {
/**
* submit與execute 都是向線程池提交任務。
* submit提交后執行提交類實現callable方法后重寫的call方法,execute提交后執行實現Runnable的run方法
* Runnable任務沒有返回值,而Callable任務有返回值。
* 并且Callable的call()方法只能通過ExecutorService的submit(Callable <T> task) 方法來執行
* 多人同時提交時的線程控制:多線程多任務
*/
a = executorService.submit(new BatchWay(list,studentService));
return a.get();
} catch (Exception e) {
e.printStackTrace();
try {
return a.get();
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
}
2.2、業務核心處理類:@Slf4j
public class BatchWay implements Callable<Boolean> {
private int batch100 = 100; //100條為分界批量導入
private List<Student> list; //list中的大量數據
private StudentService studentService;
//有參的構造函數,方便初始化其類
public BatchWay(List<Student> list, StudentService studentService) {
this.list = list;
this.studentService = studentService;
}
/**線程池*/
// private ThreadPoolExecutor threadPoolExecutor =
// new ThreadPoolExecutor(
// 10, //corePoolSize:線程池中核心線程數
// Runtime.getRuntime().availableProcessors(), //線程池中能擁有最多線程數 取所有
// 5L, //keepAliveTime:表示空閑線程的存活時間 2秒
// TimeUnit.SECONDS, //表示keepAliveTime的單位:秒
// new LinkedBlockingQueue<>(100), //用于緩存任務的阻塞隊列
//// Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.CallerRunsPolicy()
// );
/**
* 功能描述:實現Callable的call方法
* @MethodName: call
* @MethodParam: []
* @Return: JAVA.lang.Boolean
* @Author: yyalin
* @CreateDate: 2022/5/6 15:46
*/
public Boolean call(){
try {
batchOp(list);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 功能描述:批量保存數據
* @MethodName: batchOp
* @MethodParam: [list]
* @Return: void
* @Author: yyalin
* @CreateDate: 2022/5/6 15:40
*/
private void batchOp(List<Student> list) {
if(!list.isEmpty()){
Integer size = list.size();
if(size<=batch100){
//小于分批的直接插入即可
studentService.saveBatch(list);
}else if(size>batch100){
//分批后再進行保存數據
batchOpSpilit(list,batch100);
}
}
}
/**
* 功能描述:對list進行切割
* @MethodName: batchOpSpilit
* @MethodParam: [list, batch100]
* @Return: void
* @Author: yyalin
* @CreateDate: 2022/5/6 15:43
*/
private void batchOpSpilit(List<Student> list, int batch100) {
log.info("開始切割………………");
List<List<Student>> list1 = SplitListUtils.pagingList(list, batch100);
try {
for (List<Student> list2 : list1) {
batchOp(list2);
// threadPoolExecutor.allowCoreThreadTimeOut(true);
// //再調batchOp方法,這里的多線程是多個小集合往數據庫插入
// threadPoolExecutor.execute(() -> {
//// log.info("我是線程開始保存數據...:" + Thread.currentThread().getName());
// batchOp(list2);
// });
}
// log.info("當前線程池剩余的數量222222:"+threadPoolExecutor.getPoolSize());
} catch (Exception e) {
// log.info("出現異常:"+e);
} finally {
//最后關閉線程 不允許提交新的任務,但是會處理完已提交的任務
// threadPoolExecutor.shutdown();
}
}
2.3、造數據,多線程異步插入:
public String batchWay() throws Exception {
log.info("開始批量操作.........");
Random rand = new Random();
List<Student> list = new ArrayList<>();
for (int i = 0; i < 1000003; i++) {
Student student=new Student();
student.setStudentName("小李"+i);
student.setAddr("上海"+rand.nextInt(9) * 1000);
student.setAge(rand.nextInt(1000));
student.setPhone("134"+rand.nextInt(9) * 1000);
list.add(student);
}
long startTime = System.currentTimeMillis(); // 開始時間
boolean a=studentService.batchInsert(list);
long endTime = System.currentTimeMillis(); //結束時間
return "執行完成一共耗時time: " + (endTime - startTime) / 1000 + " s";
}
2.4、測試結果
匯總結果:
序號 |
核心線程(core_pool_size) |
插入數據(萬) | 耗時(秒) |
1 | 10 | 100w | 38s |
2 | 15 | 100w | 32s |
3 | 50 | 100w | 31s |
個人推薦:SpringBoot用線程池ThreadPoolTaskExecutor異步處理百萬級數據的方法。
總結:ThreadPoolTaskExecutor和ThreadPoolExecutor比Executors創建線程池更加靈活,可以設置參數,推薦ThreadPoolTaskExecutor和ThreadPoolExecutor,而ThreadPoolTaskExecutor是ThreadPoolExecutor的封裝,所以,性能更加優秀,推薦ThreadPoolTaskExecutor。