這篇文章會一步一步帶你從一個新手的角度慢慢揭開批處理的神秘面紗,對于初次寫MyBatis批處理的同學可能會有很大的幫助,建議收藏點贊~
處理批處理的方式有很多種,這里不分析各種方式的優劣,只是概述 ExecutorType.BATCH 這種的用法,另學藝不精,如果有錯的地方,還請大佬們指出更正。
問題原因
在公司寫項目的時候,有一個自動對賬的需求,需要從文件中讀取幾萬條數據插入到數據庫中,后續可能跟著業務的增長,會上升到幾十萬,所以對于插入需要進行批處理操作,下面我們就來看看我是怎么一步一步踩坑的。
簡單了解一下批處理背后的秘密,BatchExecutor
批處理是 JDBC 編程中的另一種優化手段。JDBC 在執行 SQL 語句時,會將 SQL 語句以及實參通過網絡請求的方式發送到數據庫,一次執行一條 SQL 語句,一方面會減小請求包的有效負載,另一個方面會增加耗費在網絡通信上的時間。通過批處理的方式,我們就可以在 JDBC 客戶端緩存多條 SQL 語句,然后在 flush 或緩存滿的時候,將多條 SQL 語句打包發送到數據庫執行,這樣就可以有效地降低上述兩方面的損耗,從而提高系統性能。
不過,有一點需要特別注意:每次向數據庫發送的 SQL 語句的條數是有上限的,如果批量執行的時候超過這個上限值,數據庫就會拋出異常,拒絕執行這一批 SQL 語句,所以我們需要控制批量發送 SQL 語句的條數和頻率。
引用自《深入剖析 MyBatis 核心原理》- 楊四正 第18節
版本1-呱呱墜地
廢話不多說,早先時候項目的代碼里就已經存在了批處理的代碼,偽代碼的樣子大概是這樣子的:
@Resource
private 某MApper類 mapper實例對象;
private int BATCH = 1000;
private void doUpdateBatch(Date accountDate, List<某實體類> data) {
SqlSession batchSqlSession = null;
try {
if (data == null || data.size() == 0) {
return;
}
batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
for (int index = 0; index < data.size(); index++) {
mapper實例對象.更新/插入Method(accountDate, data.get(index).getOrderNo());
if (index != 0 && index % BATCH == 0) {
batchSqlSession.commit();
batchSqlSession.clearCache();
}
}
batchSqlSession.commit();
} catch (Exception e) {
batchSqlSession.rollback();
log.error(e.getMessage(), e);
} finally {
if (batchSqlSession != null) {
batchSqlSession.close();
}
}
}
我們先來看看上述這種寫法的幾種問題
你真的懂commit、clearCache、flushStatements嘛?
我們先看看官網給出的解釋 [圖片上傳失敗...
然后我們結合上述寫法,它會在判斷批處理條數達到1000條的時候會去手動commit,然后又手動clearCache,我們先來看看commit到底都做了一些什么,以下為調用鏈
@Override
public void commit() {
commit(false);
}
@Override
public void commit(boolean force) {
try {
executor.commit(isCommitOrRollbackRequired(force));
dirty = false;
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error committing transaction. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
private boolean isCommitOrRollbackRequired(boolean force) {
// autoCommit默認為false,調用過插入、更新、刪除之后的dirty值為true
return (!autoCommit && dirty) || force;
}
@Override
public void commit(boolean required) throws SQLException {
if (closed) {
throw new ExecutorException("Cannot commit, transaction is already closed");
}
clearLocalCache();
flushStatements();
if (required) {
transaction.commit();
}
}
我們會發現,其實你直接調用commit的情況下,它就已經做了clearLocalCache這件事情,所以大可不必在commit后加上一句clearCache,而且clearCache是做了什么你又知道嘛?就擱這調用!!
另外flushStatements的作用官網里有詳細解釋
看到這里,我們在來看點反例,你就會覺得0.0 這都是啥跟啥啊!!!誤人子弟啊,直接在百度搜一段關鍵字:mybatis ExecutorType.BATCH 批處理,反例如下:
不具備通用性
由于項目中用到批處理的地方肯定不止一個,那每用一次就需要CV一下,0.0 那會不會顯得太菜了?能不能一勞永逸?這個時候就得用上JAVA8中的接口函數了~
版本2-初具雛形
在解決完上述兩個問題后,我們的代碼版本來到了第2版,你以為這就對了?這就完事了?別急,我們繼續往下看!
import lombok.extern.slf4j.Slf4j;
import org.Apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.function.ToIntFunction;
@Slf4j
@Component
public class MybatisBatchUtils {
/**
* 每次處理1000條
*/
private static final int BATCH = 1000;
@Resource
private SqlSessionFactory sqlSessionFactory;
/**
* 批量處理修改或者插入
*
* @param data 需要被處理的數據
* @param function 自定義處理邏輯
* @return int 影響的總行數
*/
public <T> int batchUpdateOrInsert(List<T> data, ToIntFunction<T> function) {
int count = 0;
SqlSession batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
try {
for (int index = 0; index < data.size(); index++) {
count += function.applyAsInt(data.get(index));
if (index != 0 && index % BATCH == 0) {
batchSqlSession.flushStatements();
}
}
batchSqlSession.commit();
} catch (Exception e) {
batchSqlSession.rollback();
log.error(e.getMessage(), e);
} finally {
batchSqlSession.close();
}
return count;
}
}
偽代碼使用案例
@Resource
private 某Mapper類 mapper實例對象;
batchUtils.batchUpdateOrInsert(數據集合, item -> mapper實例對象.insert方法(item));
這個時候我興高采烈的收工了,直到過了一兩天,導師問我,考慮過這個業務的性能嘛,后續量大了可能每天有十多萬筆數據,問我現在每天要多久,我才發現 0.0 兩三萬條數據插入居然要7分鐘(不完全是這個問題導致這么慢,還有Oracle插入語句的原因,下面會描述),,哈哈,笑不活了,簡直就是Bug制造機,我就開始思考為什么會這么慢,肯定是批處理沒生效,我就思考為什么會沒生效?
版本3-標準寫法
我們知道上面我們提到了BatchExecutor執行器,我們知道每個SqlSession都會擁有一個Executor對象,這個對象才是執行 SQL 語句的幕后黑手,我們也知道Spring跟Mybatis整合的時候使用的SqlSession是SqlSessionTemplate,默認用的是ExecutorType.SIMPLE,這個時候你通過自動注入獲得的Mapper對象其實是沒有開啟批處理的
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
那么我們實際上是需要通過
sqlSessionFactory.openSession(ExecutorType.BATCH)得到的sqlSession對象(此時里面的Executor是BatchExecutor)去獲得一個新的Mapper對象才能生效!!!
所以我們更改一下這個通用的方法,把MapperClass也一塊傳遞進來
public class MybatisBatchUtils {
/**
* 每次處理1000條
*/
private static final int BATCH_SIZE = 1000;
@Resource
private SqlSessionFactory sqlSessionFactory;
/**
* 批量處理修改或者插入
*
* @param data 需要被處理的數據
* @param mapperClass Mybatis的Mapper類
* @param function 自定義處理邏輯
* @return int 影響的總行數
*/
public <T,U,R> int batchUpdateOrInsert(List<T> data, Class<U> mapperClass, BiFunction<T,U,R> function) {
int i = 1;
SqlSession batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
try {
U mapper = batchSqlSession.getMapper(mapperClass);
int size = data.size();
for (T element : data) {
function.apply(element,mapper);
if ((i % BATCH_SIZE == 0) || i == size) {
batchSqlSession.flushStatements();
}
i++;
}
// 非事務環境下強制commit,事務情況下該commit相當于無效
batchSqlSession.commit(!TransactionSynchronizationManager.isSynchronizationActive());
} catch (Exception e) {
batchSqlSession.rollback();
log.error(e.getMessage(), e);
} finally {
batchSqlSession.close();
}
return i - 1;
}
}
這里會判斷是否是事務環境,不是的話會強制提交,如果是事務環境的話,這個commit設置force值是無效的,這個在前面的官網截圖中有提到。
使用案例:
batchUtils.batchUpdateOrInsert(數據集合, xxxxx.class, (item, mapper實例對象) -> mapper實例對象.insert方法(item));
附:Oracle批量插入優化
我們都知道Oracle主鍵序列生成策略跟MySQL不一樣,我們需要弄一個序列生成器,這里就不詳細展開描述了,然后Mybatis Generator生成的模板代碼中,insert的id是這樣獲取的
<selectKey keyProperty="id" order="BEFORE" resultType="java.lang.Long">
select XXX.nextval from dual
</selectKey>
如此,就相當于你插入1萬條數據,其實就是insert和查詢序列合計預計2萬次交互,耗時竟然達到10s多。我們改為用原生的Batch插入,這樣子的話,只要500多毫秒,也就是0.5秒的樣子
<insert id="insert" parameterType="user">
insert into table_name(id, username, password)
values(SEQ_USER.NEXTVAL,#{username},#{password})
</insert>
最后這樣一頓操作,批處理 + 語句優化一下,這個業務直接從7分多鐘變成10多秒,完美解決,撒花慶祝~
作者:Linn
鏈接:
https://juejin.cn/post/7078237987011559460