刷盤策略
CommitLog的asyncPutMessage方法中可以看到在寫入消息之后,調(diào)用了submitFlushRequest方法執(zhí)行刷盤策略:
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// ...
try {
// 獲取上一次寫入的文件
MAppedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// ...
// 寫入消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
// ...
} finally {
beginTimeInLock = 0;
putMessageLock.unlock();
}
// ...
// 執(zhí)行刷盤
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// ...
}
}
刷盤有兩種策略:
- 同步刷盤,表示消息寫入到內(nèi)存之后需要立刻刷到磁盤文件中。
- 同步刷盤會構(gòu)建GroupCommitRequest組提交請求并設(shè)置本次刷盤后的位置偏移量的值(寫入位置偏移量+寫入數(shù)據(jù)字節(jié)數(shù)),然后將請求添加到flushDiskWatcher和GroupCommitService中進行刷盤。
- 異步刷盤,表示消息寫入內(nèi)存成功之后就返回,由MQ定時將數(shù)據(jù)刷入到磁盤中,會有一定的數(shù)據(jù)丟失風(fēng)險。
public class CommitLog {
// 監(jiān)控刷盤
private final FlushDiskWatcher flushDiskWatcher;
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// 是否是同步刷盤
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 獲取GroupCommitService
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// 是否等待
if (messageExt.isWaitStoreMsgOK()) {
// 構(gòu)建組提交請求,傳入本次刷盤后位置的偏移量:寫入位置偏移量+寫入數(shù)據(jù)字節(jié)數(shù)
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
// 添加到wather中
flushDiskWatcher.add(request);
// 添加到service
service.putRequest(request);
// 返回
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// 如果是異步刷盤
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
}
同步刷盤
如果使用的是同步刷盤,首先獲取了GroupCommitService,然后構(gòu)建GroupCommitRequest組提交請求,將請求添加到flushDiskWatcher和GroupCommitService中,其中flushDiskWatcher用于監(jiān)控刷盤是否超時,GroupCommitService用于提交刷盤數(shù)據(jù)。
構(gòu)建GroupCommitRequest提交請求
GroupCommitRequest是CommitLog的內(nèi)部類:
- nextOffset:寫入位置偏移量+寫入數(shù)據(jù)字節(jié)數(shù),也就是本次刷盤成功后應(yīng)該對應(yīng)的flush偏移量
- flushOKFuture:刷盤結(jié)果
- deadLine:刷盤的限定時間,值為當前時間 + 傳入的超時時間,超過限定時間還未刷盤完畢會被認為超時
public class CommitLog {
public static class GroupCommitRequest {
private final long nextOffset;
// 刷盤狀態(tài)
private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
private final long deadLine;// 刷盤的限定時間,超過限定時間還未刷盤完畢會被認為超時
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
// 設(shè)置限定時間:當前時間 + 超時時間
this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
}
public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
// 結(jié)束刷盤,設(shè)置刷盤狀態(tài)
this.flushOKFuture.complete(putMessageStatus);
}
public CompletableFuture<PutMessageStatus> future() {
// 返回刷盤狀態(tài)
return flushOKFuture;
}
}
}
GroupCommitService處理刷盤
GroupCommitService是CommitLog的內(nèi)部類,從繼承關(guān)系中可知它實現(xiàn)了Runnable接口,在run方法調(diào)用waitForRunning等待刷盤請求的提交,然后處理刷盤,不過這個線程是在什么時候啟動的呢?
public class CommitLog {
/**
* GroupCommit Service
*/
class GroupCommitService extends FlushCommitLogService {
// ...
// run方法
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 等待刷盤請求的到來
this.waitForRunning(10);
// 處理刷盤
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// ...
}
}
}
刷盤線程的啟動
在BrokerController的啟動方法中,可以看到調(diào)用了messageStore的start方法,前面可知使用的是DefaultMessageStore,進入到DefaultMessageStore的start方法,它又調(diào)用了commitLog的start方法,在CommitLog的start方法中,啟動了刷盤的線程和監(jiān)控刷盤的線程:
public class BrokerController {
public void start() throws Exception {
if (this.messageStore != null) {
// 啟動
this.messageStore.start();
}
// ...
}
}
public class DefaultMessageStore implements MessageStore {
/**
* @throws Exception
*/
public void start() throws Exception {
// ...
this.flushConsumeQueueService.start();
// 調(diào)用CommitLog的啟動方法
this.commitLog.start();
this.storeStatsService.start();
// ...
}
}
public class CommitLog {
private final FlushCommitLogService flushCommitLogService; // 刷盤
private final FlushDiskWatcher flushDiskWatcher; // 監(jiān)控刷盤
private final FlushCommitLogService commitLogService; // commitLogService
public void start() {
// 啟動刷盤的線程
this.flushCommitLogService.start();
flushDiskWatcher.setDaemon(true);
// 啟動監(jiān)控刷盤的線程
flushDiskWatcher.start();
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
}
刷盤請求的處理
既然知道了線程在何時啟動的,接下來詳細看一下GroupCommitService是如何處理刷盤提交請求的。
前面知道在GroupCommitService的run方法中,調(diào)用了waitForRunning方法等待刷盤請求,waitForRunning在GroupCommitService父類ServiceThread中實現(xiàn)。ServiceThread是一個抽象類,實現(xiàn)了Runnable接口,里面使用了CountDownLatch進行線程間的通信,大小設(shè)為1。
waitForRunning方法在進入的時候先判斷hasNotified是否為true(已通知),并嘗試將其更新為false(未通知),由于hasNotified的初始化值為false,所以首次進入的時候條件不成立,不會進入到這個處理邏輯,會繼續(xù)執(zhí)行后面的代碼。
接著調(diào)用 waitPoint的reset方法將其重置為1,并調(diào)用waitPoint的await方法進行等待:
// ServiceThread
public abstract class ServiceThread implements Runnable {
// 是否通知,初始化為false
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
// CountDownLatch用于線程間的通信
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
// 等待運行
protected void waitForRunning(long interval) {
// 判斷hasNotified是否為true,并嘗試將其更新為false
if (hasNotified.compareAndSet(true, false)) {
// 調(diào)用onWaitEnd
this.onWaitEnd();
return;
}
// 重置waitPoint的值,也就是值為1
waitPoint.reset();
try {
// 會一直等待waitPoint值降為0
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
// 是否被通知設(shè)置為false
hasNotified.set(false);
this.onWaitEnd();
}
}
}
一、添加刷盤請求,喚醒刷盤線程
上面可知需要刷盤的時候調(diào)用了GroupCommitService的putRequest方法添加刷盤請求,在putRequest方法中,將刷盤請求GroupCommitRequest添加到了requestsWrite組提交寫請求鏈表中,然后調(diào)用wakeup方法喚醒刷盤線程,wakeup方法在它的父類ServiceThread中實現(xiàn)。
在wakeup方法中可以看到,首先將hasNotified更改為了true表示處于已通知狀態(tài),然后調(diào)用了countDown方法,此時waitPoint值變成0,就會喚醒之前waitForRunning方法中一直在等待的線程。
public class CommitLog {
/**
* 組提交Service
*/
class GroupCommitService extends FlushCommitLogService {
// 組提交寫請求鏈表
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
// ...
// 添加提交請求
public synchronized void putRequest(final GroupCommitRequest request) {
// 加鎖
lock.lock();
try {
// 加入到寫請求鏈表
this.requestsWrite.add(request);
} finally {
lock.unlock();
}
// 喚醒線程執(zhí)行提交任務(wù)
this.wakeup();
}
// ...
}
}
// ServiceThread
public abstract class ServiceThread implements Runnable {
// CountDownLatch用于線程間的通信
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
// 喚醒刷盤線程
public void wakeup() {
// 更改狀態(tài)為已通知狀態(tài)
if (hasNotified.compareAndSet(false, true)) {
// waitPoint的值減1,由于大小設(shè)置為1,減1之后變?yōu)?,會喚醒等待的線程
waitPoint.countDown();
}
}
// ...
}
二、線程被喚醒,執(zhí)行刷盤前的操作
waitForRunning方法中的await方法一直在等待countdown的值變?yōu)?,當上一步調(diào)用了wakeup后,就會喚醒該線程,然后開始往下執(zhí)行,在finally中可以看到將是否被通知hasNotified又設(shè)置為了false,然后調(diào)用了onWaitEnd方法,GroupCommitService方法中重寫了該方法,里面又調(diào)用了swapRequests方法將讀寫請求列表的數(shù)據(jù)進行了交換,putRequest方法中將提交的刷盤請求放在了寫鏈表中,經(jīng)過交換,數(shù)據(jù)會被放在讀鏈表中,后續(xù)進行刷盤時會從讀鏈表中獲取請求進行處理:
// ServiceThread
public abstract class ServiceThread implements Runnable {
// CountDownLatch
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
// 等待運行
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
// 交換
this.onWaitEnd();
return;
}
// 重置
waitPoint.reset();
try {
// 會一直等待countdown為0
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
// 是否被通知設(shè)置為false
hasNotified.set(false);
this.onWaitEnd();
}
}
}
public class CommitLog {
/**
* 組提交Service
*/
class GroupCommitService extends FlushCommitLogService {
// 組提交寫請求鏈表
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
// 組提交讀請求鏈表
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
@Override
protected void onWaitEnd() {
// 交換讀寫請求列表的數(shù)據(jù)請求
this.swapRequests();
}
private void swapRequests() {
// 加鎖
lock.lock();
try {
// 將讀寫請求鏈表的數(shù)據(jù)進行交換
LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
}
// ...
}
}
折疊
這里使用讀寫鏈表進行交換應(yīng)該是為了提升性能,如果只使用一個鏈表,在提交請求的時候需要往鏈表中添加請求,此時需要加鎖,而刷盤線程在處理完請求之后是需要從鏈表中移除請求的,假設(shè)添加請求時加的鎖還未釋放,刷盤線程就要一直等待,而添加和處理完全可以同時進行,所以使用了兩個鏈表,在添加請求的時候使用寫鏈表,處理請求的時候?qū)ψx寫鏈表的數(shù)據(jù)進行交換使用讀鏈表,這樣只需在交換數(shù)據(jù)的時候加鎖,以此來提升性能。
三、執(zhí)行刷盤
waitForRunning執(zhí)行完畢后,會回到GroupCommitService中的run方法開始繼續(xù)往后執(zhí)行代碼,從代碼中可以看到接下來會調(diào)用doCommit方法執(zhí)行刷盤。
doCommit方法中對讀鏈表中的數(shù)據(jù)進行了判空,如果不為空,進行遍歷處理每一個提交請求,處理邏輯如下:
- 獲取CommitLog映射文件記錄的刷盤位置偏移量flushedWhere,判斷是否大于請求設(shè)定的刷盤位置偏移量nextOffset,正常情況下flush的位置應(yīng)該小于本次刷入數(shù)據(jù)后的偏移量,所以如果flush位置大于等于本次請求設(shè)置的flush偏移量,本次將不能進行刷盤
- 開啟一個循環(huán),調(diào)用mappedFileQueue的flush方法執(zhí)行刷盤(具體的實現(xiàn)在異步刷盤的時候再看),由于CommitLog大小為1G,所以本次刷完之后,如果當前已經(jīng)刷入的偏移量小于請求設(shè)定的位置,表示數(shù)據(jù)未刷完,需要繼續(xù)刷,反之表示數(shù)據(jù)已經(jīng)刷完,flushOK為true,for循環(huán)條件不滿足結(jié)束執(zhí)行。
- 請求處理之后會清空讀鏈表。
public class CommitLog {
/**
* 組提交Service
*/
class GroupCommitService extends FlushCommitLogService {
// 運行
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
// 如果沒有停止
while (!this.isStopped()) {
try {
// 等待喚醒刷盤線程
this.waitForRunning(10);
// 進行提交
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// 睡眠10毫秒
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn(this.getServiceName() + " Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
// 停止之前提交一次
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
// 提交刷盤
private void doCommit() {
// 如果不為空
if (!this.requestsRead.isEmpty()) {
// 遍歷刷盤請求
for (GroupCommitRequest req : this.requestsRead) {
// 獲取映射文件的flush位置,判斷是否大于請求設(shè)定的刷盤位置
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
// 進行刷盤
CommitLog.this.mappedFileQueue.flush(0);
// 由于CommitLog大小為1G,所以本次刷完之后,如果當前已經(jīng)刷入的偏移量小于請求設(shè)定的位置,表示數(shù)據(jù)未刷完,需要繼續(xù)刷,反之表示數(shù)據(jù)已經(jīng)刷完,flushOK為true,for循環(huán)條件不滿足結(jié)束執(zhí)行
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
// 設(shè)置刷盤結(jié)果
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
// 請求處理完之后清空鏈表
this.requestsRead = new LinkedList<>();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
}
折疊
刷盤超時監(jiān)控
FlushDiskWatcher用于監(jiān)控刷盤請求的耗時,它也繼承了ServiceThread,在Broker啟動時開啟了該線程,在run方法中,使用while循環(huán),只要服務(wù)未停止,會一直從阻塞隊列中獲取提交的刷盤請求,開啟while循環(huán)隔一段時間判斷一下刷盤是否完成,如果未完成,會做如下判斷:
- 使用當前時間減去請求設(shè)置的刷盤截止時間,如果已經(jīng)超過截止時間,說明刷盤時間已經(jīng)超時,調(diào)用wakeupCustomer方法設(shè)置刷盤結(jié)果為已超時
- 如果未超時,為了避免當前線程頻繁的進行判斷,將當前線程睡眠一會兒,睡眠的計算方式是使用刷盤請求設(shè)置的截止時間 - 當前時間,表示剩余的時間,然后除以1000000化為毫秒,得到距離刷盤截止時間的毫秒數(shù)sleepTime:sleepTime如果為0,只能是當前時間等于截止時間,也就是到了截止時間,此時同樣調(diào)用wakeupCustomer方法設(shè)置刷盤結(jié)果為已超時sleepTime不為0,在10毫秒和sleepTime的值之間取較小的那個作為睡眠的毫秒數(shù)將當前線程睡眠,等待刷盤任務(wù)執(zhí)行
public class FlushDiskWatcher extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 阻塞隊列,存放提交請求
private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();
@Override
public String getServiceName() {
return FlushDiskWatcher.class.getSimpleName();
}
@Override
public void run() {
// 如果未停止
while (!isStopped()) {
GroupCommitRequest request = null;
try {
// 從阻塞隊列中獲取提交請求
request = commitRequests.take();
} catch (InterruptedException e) {
log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
continue;
}
// 如果還未完成
while (!request.future().isDone()) {
long now = System.nanoTime();
// 如果已經(jīng)超時
if (now - request.getDeadLine() >= 0) {
// 設(shè)置刷盤結(jié)果為超時
request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
break;
}
// 避免頻繁的判斷,使用(截止時間 - 當前時間)/1000000 計算一個毫秒數(shù)
long sleepTime = (request.getDeadLine() - now) / 1_000_000;
// 在計算的毫秒數(shù)與10之間取最小的
sleepTime = Math.min(10, sleepTime);
// 如果sleepTime為0表示已經(jīng)到了截止時間
if (sleepTime == 0) {
// 設(shè)置刷盤結(jié)果為超時
request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
break;
}
try {
// 睡眠等待刷盤任務(wù)的執(zhí)行
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.warn(
"An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
break;
}
}
}
}
}
折疊
異步刷盤
上面講解了同步刷盤,接下來去看下異步刷盤,首先會判斷是否使用了暫存池,如果未開啟調(diào)用flushCommitLogService的wakeup喚醒刷盤線程,否則使用commitLogService先將數(shù)據(jù)寫入到FileChannel,然后統(tǒng)一進行刷盤:
public class CommitLog {
private final FlushDiskWatcher flushDiskWatcher;
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// 是否是同步刷盤
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// ...
}
// 如果是異步刷盤
else {
// 如果未使用暫存池
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// 喚醒刷盤線程進行刷盤
flushCommitLogService.wakeup();
} else {
// 如果使用暫存池,使用commitLogService,先將數(shù)據(jù)寫入到FILECHANNEL,然后統(tǒng)一進行刷盤
commitLogService.wakeup();
}
// 返回結(jié)果
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
}
在CommitLog的構(gòu)造函數(shù)中可以看到,commitLogService使用的是CommitRealTimeService進行實例化的,flushCommitLogService需要根據(jù)設(shè)置決定使用哪種類型進行實例化:
- 如果是同步刷盤,使用GroupCommitService,由前面的同步刷盤可知,使用的就是GroupCommitService進行刷盤的。
- 如果是異步刷盤,使用FlushRealTimeService。
所以接下來需要關(guān)注CommitRealTimeService和FlushRealTimeService:
public class CommitLog {
private final FlushCommitLogService flushCommitLogService;
// 刷盤Service
private final FlushCommitLogService commitLogService;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
// 如果設(shè)置的同步刷盤
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 使用GroupCommitService
this.flushCommitLogService = new GroupCommitService();
} else {
// 使用FlushRealTimeService
this.flushCommitLogService = new FlushRealTimeService();
}
// commitLogService
this.commitLogService = new CommitRealTimeService();
}
}
CommitRealTimeService
在開啟暫存池時,會使用CommitRealTimeService,它繼承了FlushCommitLogService,所以會實現(xiàn)run方法,處理邏輯如下:
- 從配置信息中獲取提交間隔、每次提交的最少頁數(shù)和兩次提交的最大間隔時間
- 如果當前時間大于上次提交時間+兩次提交的最大間隔時間,意味著已經(jīng)有比較長的一段時間沒有進行提交了,需要盡快刷盤,此時將每次提交的最少頁數(shù)設(shè)置為0不限制提交頁數(shù)
- 調(diào)用mappedFileQueue的commit方法進行提交,并返回提交的結(jié)果:如果結(jié)果為true表示未提交任何數(shù)據(jù)如果結(jié)果為false表示進行了數(shù)據(jù)提交,需要等待刷盤
- 判斷提交返回結(jié)果是否返回false,如果是調(diào)用flushCommitLogService的wakeup方法喚醒刷盤線程,進行刷盤
- 調(diào)用waitForRunning等待下一次提交處理
class CommitRealTimeService extends FlushCommitLogService {
// 上次提交時間戳
private long lastCommitTimestamp = 0;
@Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
// 如果未停止
while (!this.isStopped()) {
// 獲取提交間隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
// 一次提交的最少頁數(shù)
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
// 兩次提交的最大間隔時間
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
// 開始時間
long begin = System.currentTimeMillis();
// 如果當前時間大于上次提交時間+提交的最大間隔時間
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin; // 提交時間
commitDataLeastPages = 0;// 最少提交頁數(shù)設(shè)為0,表示不限制提交頁數(shù)
}
try {
// 提交
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
// 提交結(jié)束時間
long end = System.currentTimeMillis();
// 如果返回false表示提交了一部分數(shù)據(jù)但是還未進行刷盤
if (!result) {
// 再次更新提交時間戳
this.lastCommitTimestamp = end;
// 喚醒flush線程進行刷盤
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
// 等待下一次提交
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}
折疊
提交
提交的方法在MappedFileQueue的commit方法中實現(xiàn),處理邏輯如下:
- 根據(jù)記錄的CommitLog文件提交位置的偏移量獲取映射文件,如果獲取不為空,調(diào)用MappedFile的commit方法進行提交,然后返回本次提交數(shù)據(jù)的偏移量
- 記錄本次提交的偏移量:文件的偏移量 + 提交數(shù)據(jù)的偏移量
- 判斷本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何數(shù)據(jù),返回結(jié)果置為true,否則表示提交了數(shù)據(jù),等待刷盤,返回結(jié)果為false
- 更新上一次提交偏移量committedWhere的值為本次的提交偏移量的值
public class MappedFileQueue {
protected long flushedWhere = 0; // flush的位置偏移量
private long committedWhere = 0; // 提交的位置偏移量
public boolean commit(final int commitLeastPages) {
boolean result = true;
// 根據(jù)提交位置的偏移量獲取映射文件
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
// 調(diào)用mappedFile的commit方法進行提交,返回提交數(shù)據(jù)的偏移量
int offset = mappedFile.commit(commitLeastPages);
// 記錄本次提交的偏移量:文件的偏移量 + 提交數(shù)據(jù)的偏移量
long where = mappedFile.getFileFromOffset() + offset;
// 設(shè)置返回結(jié)果,如果本次提交偏移量等于上一次的提交偏移量為true,表示什么也沒干,否則表示提交了數(shù)據(jù),等待刷盤
result = where == this.committedWhere;
// 更新上一次提交偏移量的值為本次的
this.committedWhere = where;
}
return result;
}
}
MappedFile
MappedFile中記錄CommitLog的寫入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,調(diào)用了isAbleToCommit判斷是否可以提交數(shù)據(jù),判斷的流程如下:
- 獲取提交數(shù)據(jù)的位置偏移量和寫入數(shù)據(jù)的位置偏移量
- 如果最少提交頁數(shù)大于0,計算本次寫入的頁數(shù)是否大于或等于最少提交頁數(shù)
- 本次寫入數(shù)據(jù)的頁數(shù)計算方法:寫入位置/頁大小 - flush位置/頁大小
- 如果以上條件都滿足,判斷寫入位置是否大于flush位置,如果大于表示有一部數(shù)據(jù)未flush可以進行提交
滿足提交條件后,就會調(diào)用commit0方法提交數(shù)據(jù),將數(shù)據(jù)寫入到fileChannel中:
public class MappedFile extends ReferenceResource {
// 數(shù)據(jù)寫入位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 數(shù)據(jù)提交位置
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 數(shù)據(jù)flush位置
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 提交數(shù)據(jù)
public int commit(final int commitLeastPages) {
// 如果writeBuffer為空
if (writeBuffer == null) {
// 不需要提交任何數(shù)據(jù)到,返回之前記錄的寫入位置
return this.wrotePosition.get();
}
// 如果可以提交數(shù)據(jù)
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
// 提交數(shù)據(jù)
commit0();
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
// 返回提交位置
return this.committedPosition.get();
}
// 是否可以提交數(shù)據(jù)
protected boolean isAbleToCommit(final int commitLeastPages) {
// 獲取提交數(shù)據(jù)的位置偏移量
int flush = this.committedPosition.get();
// 獲取寫入數(shù)據(jù)的位置偏移量
int write = this.wrotePosition.get();
if (this.isFull()) {
return true;
}
// 如果最少提交頁數(shù)大于0
if (commitLeastPages > 0) {
// 寫入位置/頁大小 - flush位置/頁大小 是否大于至少提交的頁數(shù)
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
}
// 判斷是否需要flush數(shù)據(jù)
return write > flush;
}
protected void commit0() {
// 獲取寫入位置
int writePos = this.wrotePosition.get();
// 獲取上次提交的位置
int lastCommittedPosition = this.committedPosition.get();
if (writePos - lastCommittedPosition > 0) {
try {
// 創(chuàng)建共享緩沖區(qū)
ByteBuffer byteBuffer = writeBuffer.slice();
// 設(shè)置上一次提交位置
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
// 數(shù)據(jù)寫入fileChannel
this.fileChannel.write(byteBuffer);
// 更新寫入的位置
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
}
折疊
FlushRealTimeService
如果未開啟暫存池,會直接使用FlushRealTimeService進行刷盤,當然如果開啟暫存池,寫入一批數(shù)據(jù)后,同樣會使用FlushRealTimeService進行刷盤,F(xiàn)lushRealTimeService同樣繼承了FlushCommitLogService,是用于執(zhí)行刷盤的線程,處理邏輯與提交刷盤數(shù)據(jù)邏輯相似,只不過不是提交數(shù)據(jù),而是調(diào)用flush方法將提交的數(shù)據(jù)刷入磁盤:
- 從配置信息中獲取flush間隔、每次flush的最少頁數(shù)和兩次flush的最大間隔時間
- 如果當前時間大于上次flush時間+兩次flush的最大間隔時間,意味著已經(jīng)有比較長的一段時間沒有進行flush,此時將每次flush的最少頁數(shù)設(shè)置為0不限制flush頁數(shù)
- 調(diào)用waitForRunning等待被喚醒
- 如果被喚醒,調(diào)用mappedFileQueue的flush方法進行刷盤
class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0; // 上一次flush的時間
private long printTimes = 0;
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
// 如果未停止
while (!this.isStopped()) {
//
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// 獲取flush間隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// flush至少包含的頁數(shù)
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 兩次flush的時間間隔
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
long currentTimeMillis = System.currentTimeMillis();
// 如果當前毫秒數(shù) 大于上次flush時間 + 兩次flush之間的間隔
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis; // 更新flush時間
flushPhysicQueueLeastPages = 0; // flush至少包含的頁數(shù)置為0
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
//
if (flushCommitLogTimed) {
// 睡眠
Thread.sleep(interval);
} else {
// 等待flush被喚醒
this.waitForRunning(interval);
}
if (printFlushProgress) {
// 打印刷盤進程
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
// 進行刷盤
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// 如果服務(wù)停止,確保數(shù)據(jù)被刷盤
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
// 進行刷盤
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
折疊
刷盤
刷盤的方法在MappedFileQueue的flush方法中實現(xiàn),處理邏輯如下:
- 根據(jù) flush的位置偏移量獲取映射文件
- 調(diào)用mappedFile的flush方法進行刷盤,并返回刷盤后的位置偏移量
- 計算最新的flush偏移量
- 更新flushedWhere的值為最新的flush偏移量
public class MappedFileQueue {
protected long flushedWhere = 0; // flush的位置偏移量
private long committedWhere = 0; // 提交的位置偏移量
// flush刷盤
public boolean flush(final int flushLeastPages) {
boolean result = true;
// 獲取flush的位置偏移量映射文件
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
// 獲取時間戳
long tmpTimeStamp = mappedFile.getStoreTimestamp();
// 調(diào)用MappedFile的flush方法進行刷盤,返回刷盤后的偏移量
int offset = mappedFile.flush(flushLeastPages);
// 計算最新的flush偏移量
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
// 更新flush偏移量
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
// 返回flush的偏移量
return result;
}
}
flush的邏輯也與commit方法的邏輯類似:
- 調(diào)用isAbleToFlush判斷是否滿足刷盤條件,獲取上次flush位置偏移量和當前寫入位置偏移量進行如下校驗:
- 文件是否已寫滿,即文件大小是否與寫入數(shù)據(jù)位置相等,如果相等說明文件已經(jīng)寫滿需要執(zhí)行刷盤,滿足刷盤條件
- 如果最少flush頁數(shù)大于0,計算本次flush的頁數(shù)是否大于或等于最少flush頁數(shù),如果滿足可以進行刷盤
- 本次flush數(shù)據(jù)的頁數(shù)計算方法:寫入位置/頁大小 - flush位置/頁大小
- 如果寫入位置偏移量是否大于flush位置偏移量,如果大于表示有數(shù)據(jù)未進行刷盤,滿足刷盤條件
- 調(diào)用fileChannel的force或者mappedByteBuffer的force方法進行刷盤
- 記錄本次flush的位置,并作為結(jié)果返回
public class MappedFile extends ReferenceResource {
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
protected final AtomicInteger committedPosition = new AtomicInteger(0);
private final AtomicInteger flushedPosition = new AtomicInteger(0);
/**
* 進行刷盤并返回flush后的偏移量
*/
public int flush(final int flushLeastPages) {
// 是否可以刷盤
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
// 如果writeBuffer不為空
if (writeBuffer != null || this.fileChannel.position() != 0) {
// 將數(shù)據(jù)刷到硬盤
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
// 記錄flush位置
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
// 返回flush位置
return this.getFlushedPosition();
}
// 是否可以刷盤
private boolean isAbleToFlush(final int flushLeastPages) {
// 獲取上次flush位置
int flush = this.flushedPosition.get();
// 寫入位置偏移量
int write = getReadPosition();
if (this.isFull()) {
return true;
}
// 如果flush的頁數(shù)大于0,校驗本次flush的頁數(shù)是否滿足條件
if (flushLeastPages > 0) {
// 本次flush的頁數(shù):寫入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPages
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
// 寫入位置偏移量是否大于flush位置偏移量
return write > flush;
}
// 文件是否已寫滿
public boolean isFull() {
// 文件大小是否與寫入數(shù)據(jù)位置相等
return this.fileSize == this.wrotePosition.get();
}
/**
* 返回當前有效數(shù)據(jù)的位置
*/
public int getReadPosition() {
// 如果writeBuffer為空使用寫入位置,否則使用提交位置
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
}
折疊