日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

刷盤策略

CommitLog的asyncPutMessage方法中可以看到在寫入消息之后,調(diào)用了submitFlushRequest方法執(zhí)行刷盤策略:

public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // ...
        try {
            // 獲取上一次寫入的文件
            MAppedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            // ...
            // 寫入消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            // ...
        } finally {
            beginTimeInLock = 0;
            putMessageLock.unlock();
        }
        // ...
        // 執(zhí)行刷盤
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        // ...
    }
}

刷盤有兩種策略:

  • 同步刷盤,表示消息寫入到內(nèi)存之后需要立刻刷到磁盤文件中。
  • 同步刷盤會構(gòu)建GroupCommitRequest組提交請求并設(shè)置本次刷盤后的位置偏移量的值(寫入位置偏移量+寫入數(shù)據(jù)字節(jié)數(shù)),然后將請求添加到flushDiskWatcher和GroupCommitService中進行刷盤。
  • 異步刷盤,表示消息寫入內(nèi)存成功之后就返回,由MQ定時將數(shù)據(jù)刷入到磁盤中,會有一定的數(shù)據(jù)丟失風(fēng)險。
public class CommitLog {
    // 監(jiān)控刷盤
    private final FlushDiskWatcher flushDiskWatcher;
    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // 是否是同步刷盤
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // 獲取GroupCommitService
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            // 是否等待
            if (messageExt.isWaitStoreMsgOK()) {
                // 構(gòu)建組提交請求,傳入本次刷盤后位置的偏移量:寫入位置偏移量+寫入數(shù)據(jù)字節(jié)數(shù)
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                // 添加到wather中
                flushDiskWatcher.add(request);
                // 添加到service
                service.putRequest(request);
                // 返回
                return request.future();
            } else {
                service.wakeup();
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
        // 如果是異步刷盤
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else  {
                commitLogService.wakeup();
            }
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
}

同步刷盤

如果使用的是同步刷盤,首先獲取了GroupCommitService,然后構(gòu)建GroupCommitRequest組提交請求,將請求添加到flushDiskWatcher和GroupCommitService中,其中flushDiskWatcher用于監(jiān)控刷盤是否超時,GroupCommitService用于提交刷盤數(shù)據(jù)。

構(gòu)建GroupCommitRequest提交請求

GroupCommitRequest是CommitLog的內(nèi)部類:

  • nextOffset:寫入位置偏移量+寫入數(shù)據(jù)字節(jié)數(shù),也就是本次刷盤成功后應(yīng)該對應(yīng)的flush偏移量
  • flushOKFuture:刷盤結(jié)果
  • deadLine:刷盤的限定時間,值為當前時間 + 傳入的超時時間,超過限定時間還未刷盤完畢會被認為超時
public class CommitLog {
    public static class GroupCommitRequest {
        private final long nextOffset;
        // 刷盤狀態(tài)
        private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
        private final long deadLine;// 刷盤的限定時間,超過限定時間還未刷盤完畢會被認為超時

        public GroupCommitRequest(long nextOffset, long timeoutMillis) {
            this.nextOffset = nextOffset;
            // 設(shè)置限定時間:當前時間 + 超時時間
            this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
        }

        public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
            // 結(jié)束刷盤,設(shè)置刷盤狀態(tài)
            this.flushOKFuture.complete(putMessageStatus);
        }

        public CompletableFuture<PutMessageStatus> future() {
            // 返回刷盤狀態(tài)
            return flushOKFuture;
        }

    }
}

GroupCommitService處理刷盤

GroupCommitService是CommitLog的內(nèi)部類,從繼承關(guān)系中可知它實現(xiàn)了Runnable接口,在run方法調(diào)用waitForRunning等待刷盤請求的提交,然后處理刷盤,不過這個線程是在什么時候啟動的呢?

public class CommitLog {
    /**
     * GroupCommit Service
     */
    class GroupCommitService extends FlushCommitLogService {
        // ...
        // run方法
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    // 等待刷盤請求的到來
                    this.waitForRunning(10);
                    // 處理刷盤
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
            // ...
        }
    }
}

刷盤線程的啟動

在BrokerController的啟動方法中,可以看到調(diào)用了messageStore的start方法,前面可知使用的是DefaultMessageStore,進入到DefaultMessageStore的start方法,它又調(diào)用了commitLog的start方法,在CommitLog的start方法中,啟動了刷盤的線程和監(jiān)控刷盤的線程:

public class BrokerController {
    public void start() throws Exception {
        if (this.messageStore != null) {
            // 啟動
            this.messageStore.start();
        }
        // ...
    }
}

public class DefaultMessageStore implements MessageStore {
   /**
     * @throws Exception
     */
    public void start() throws Exception {
        // ...
        this.flushConsumeQueueService.start();
        // 調(diào)用CommitLog的啟動方法
        this.commitLog.start();
        this.storeStatsService.start();
        // ...
    }
}

public class CommitLog {
    private final FlushCommitLogService flushCommitLogService; // 刷盤
    private final FlushDiskWatcher flushDiskWatcher; // 監(jiān)控刷盤
    private final FlushCommitLogService commitLogService; // commitLogService
    public void start() {
        // 啟動刷盤的線程
        this.flushCommitLogService.start();
        flushDiskWatcher.setDaemon(true);
        // 啟動監(jiān)控刷盤的線程
        flushDiskWatcher.start();
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.start();
        }
    }
}

刷盤請求的處理

既然知道了線程在何時啟動的,接下來詳細看一下GroupCommitService是如何處理刷盤提交請求的。

前面知道在GroupCommitService的run方法中,調(diào)用了waitForRunning方法等待刷盤請求,waitForRunning在GroupCommitService父類ServiceThread中實現(xiàn)。ServiceThread是一個抽象類,實現(xiàn)了Runnable接口,里面使用了CountDownLatch進行線程間的通信,大小設(shè)為1。

waitForRunning方法在進入的時候先判斷hasNotified是否為true(已通知),并嘗試將其更新為false(未通知),由于hasNotified的初始化值為false,所以首次進入的時候條件不成立,不會進入到這個處理邏輯,會繼續(xù)執(zhí)行后面的代碼。

接著調(diào)用 waitPoint的reset方法將其重置為1,并調(diào)用waitPoint的await方法進行等待:

// ServiceThread
public abstract class ServiceThread implements Runnable {
    // 是否通知,初始化為false
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
  
    // CountDownLatch用于線程間的通信
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
  
    // 等待運行
    protected void waitForRunning(long interval) {
        // 判斷hasNotified是否為true,并嘗試將其更新為false
        if (hasNotified.compareAndSet(true, false)) {
            // 調(diào)用onWaitEnd
            this.onWaitEnd();
            return;
        }

        // 重置waitPoint的值,也就是值為1
        waitPoint.reset();
        try {
            // 會一直等待waitPoint值降為0
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            // 是否被通知設(shè)置為false
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }
}

一、添加刷盤請求,喚醒刷盤線程

上面可知需要刷盤的時候調(diào)用了GroupCommitService的putRequest方法添加刷盤請求,在putRequest方法中,將刷盤請求GroupCommitRequest添加到了requestsWrite組提交寫請求鏈表中,然后調(diào)用wakeup方法喚醒刷盤線程,wakeup方法在它的父類ServiceThread中實現(xiàn)。

在wakeup方法中可以看到,首先將hasNotified更改為了true表示處于已通知狀態(tài),然后調(diào)用了countDown方法,此時waitPoint值變成0,就會喚醒之前waitForRunning方法中一直在等待的線程。

public class CommitLog {
    /**
     * 組提交Service
     */
    class GroupCommitService extends FlushCommitLogService {
        // 組提交寫請求鏈表
        private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
        
        // ...
      
        // 添加提交請求
        public synchronized void putRequest(final GroupCommitRequest request) {
            // 加鎖
            lock.lock();
            try {
                // 加入到寫請求鏈表
                this.requestsWrite.add(request);
            } finally {
                lock.unlock();
            }
            // 喚醒線程執(zhí)行提交任務(wù)
            this.wakeup();
        }   
        // ...
    }
  
}

// ServiceThread
public abstract class ServiceThread implements Runnable {

    // CountDownLatch用于線程間的通信
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
  
    // 喚醒刷盤線程
    public void wakeup() {
        // 更改狀態(tài)為已通知狀態(tài)
        if (hasNotified.compareAndSet(false, true)) {
            // waitPoint的值減1,由于大小設(shè)置為1,減1之后變?yōu)?,會喚醒等待的線程
            waitPoint.countDown(); 
        }
    }
  
    // ...
}

二、線程被喚醒,執(zhí)行刷盤前的操作

waitForRunning方法中的await方法一直在等待countdown的值變?yōu)?,當上一步調(diào)用了wakeup后,就會喚醒該線程,然后開始往下執(zhí)行,在finally中可以看到將是否被通知hasNotified又設(shè)置為了false,然后調(diào)用了onWaitEnd方法,GroupCommitService方法中重寫了該方法,里面又調(diào)用了swapRequests方法將讀寫請求列表的數(shù)據(jù)進行了交換,putRequest方法中將提交的刷盤請求放在了寫鏈表中,經(jīng)過交換,數(shù)據(jù)會被放在讀鏈表中,后續(xù)進行刷盤時會從讀鏈表中獲取請求進行處理

// ServiceThread
public abstract class ServiceThread implements Runnable {
    // CountDownLatch
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    
    // 等待運行
    protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            // 交換
            this.onWaitEnd();
            return;
        }

        // 重置
        waitPoint.reset();
        try {
            // 會一直等待countdown為0
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            // 是否被通知設(shè)置為false
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }
}

public class CommitLog {
    /**
     * 組提交Service
     */
    class GroupCommitService extends FlushCommitLogService {
        // 組提交寫請求鏈表
        private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
        // 組提交讀請求鏈表
        private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
        
        @Override
        protected void onWaitEnd() {
            // 交換讀寫請求列表的數(shù)據(jù)請求
            this.swapRequests();
        }

        private void swapRequests() {
            // 加鎖
            lock.lock();
            try {
                // 將讀寫請求鏈表的數(shù)據(jù)進行交換
                LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
                this.requestsWrite = this.requestsRead;
                this.requestsRead = tmp;
            } finally {
                lock.unlock();
            }
        }
        // ...
    }
}
折疊 

這里使用讀寫鏈表進行交換應(yīng)該是為了提升性能,如果只使用一個鏈表,在提交請求的時候需要往鏈表中添加請求,此時需要加鎖,而刷盤線程在處理完請求之后是需要從鏈表中移除請求的,假設(shè)添加請求時加的鎖還未釋放,刷盤線程就要一直等待,而添加和處理完全可以同時進行,所以使用了兩個鏈表,在添加請求的時候使用寫鏈表,處理請求的時候?qū)ψx寫鏈表的數(shù)據(jù)進行交換使用讀鏈表,這樣只需在交換數(shù)據(jù)的時候加鎖,以此來提升性能。

三、執(zhí)行刷盤

waitForRunning執(zhí)行完畢后,會回到GroupCommitService中的run方法開始繼續(xù)往后執(zhí)行代碼,從代碼中可以看到接下來會調(diào)用doCommit方法執(zhí)行刷盤。

doCommit方法中對讀鏈表中的數(shù)據(jù)進行了判空,如果不為空,進行遍歷處理每一個提交請求,處理邏輯如下:

  1. 獲取CommitLog映射文件記錄的刷盤位置偏移量flushedWhere,判斷是否大于請求設(shè)定的刷盤位置偏移量nextOffset,正常情況下flush的位置應(yīng)該小于本次刷入數(shù)據(jù)后的偏移量,所以如果flush位置大于等于本次請求設(shè)置的flush偏移量,本次將不能進行刷盤
「RocketMQ」消息的刷盤機制

 

  1. 開啟一個循環(huán),調(diào)用mappedFileQueue的flush方法執(zhí)行刷盤(具體的實現(xiàn)在異步刷盤的時候再看),由于CommitLog大小為1G,所以本次刷完之后,如果當前已經(jīng)刷入的偏移量小于請求設(shè)定的位置,表示數(shù)據(jù)未刷完,需要繼續(xù)刷,反之表示數(shù)據(jù)已經(jīng)刷完,flushOK為true,for循環(huán)條件不滿足結(jié)束執(zhí)行。
  2. 請求處理之后會清空讀鏈表。
public class CommitLog {
    /**
     * 組提交Service
     */
    class GroupCommitService extends FlushCommitLogService {  
        // 運行
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            // 如果沒有停止
            while (!this.isStopped()) {
                try {
                    // 等待喚醒刷盤線程
                    this.waitForRunning(10);
                    // 進行提交
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // 睡眠10毫秒
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn(this.getServiceName() + " Exception, ", e);
            }

            synchronized (this) {
                this.swapRequests();
            }
            // 停止之前提交一次
            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }
      
        // 提交刷盤
        private void doCommit() {
            // 如果不為空
            if (!this.requestsRead.isEmpty()) {
                // 遍歷刷盤請求
                for (GroupCommitRequest req : this.requestsRead) {
                    // 獲取映射文件的flush位置,判斷是否大于請求設(shè)定的刷盤位置
                    boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    for (int i = 0; i < 2 && !flushOK; i++) {
                        // 進行刷盤
                        CommitLog.this.mappedFileQueue.flush(0);
                        // 由于CommitLog大小為1G,所以本次刷完之后,如果當前已經(jīng)刷入的偏移量小于請求設(shè)定的位置,表示數(shù)據(jù)未刷完,需要繼續(xù)刷,反之表示數(shù)據(jù)已經(jīng)刷完,flushOK為true,for循環(huán)條件不滿足結(jié)束執(zhí)行
                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    }
                    // 設(shè)置刷盤結(jié)果
                    req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }

                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                // 請求處理完之后清空鏈表
                this.requestsRead = new LinkedList<>();
            } else {
                // Because of individual messages is set to not sync flush, it
                // will come to this process
                CommitLog.this.mappedFileQueue.flush(0);
            }
        }
    }
  
}
折疊 

刷盤超時監(jiān)控

FlushDiskWatcher用于監(jiān)控刷盤請求的耗時,它也繼承了ServiceThread,在Broker啟動時開啟了該線程,在run方法中,使用while循環(huán),只要服務(wù)未停止,會一直從阻塞隊列中獲取提交的刷盤請求,開啟while循環(huán)隔一段時間判斷一下刷盤是否完成,如果未完成,會做如下判斷:

  1. 使用當前時間減去請求設(shè)置的刷盤截止時間,如果已經(jīng)超過截止時間,說明刷盤時間已經(jīng)超時,調(diào)用wakeupCustomer方法設(shè)置刷盤結(jié)果為已超時
  2. 如果未超時,為了避免當前線程頻繁的進行判斷,將當前線程睡眠一會兒,睡眠的計算方式是使用刷盤請求設(shè)置的截止時間 - 當前時間,表示剩余的時間,然后除以1000000化為毫秒,得到距離刷盤截止時間的毫秒數(shù)sleepTime:sleepTime如果為0,只能是當前時間等于截止時間,也就是到了截止時間,此時同樣調(diào)用wakeupCustomer方法設(shè)置刷盤結(jié)果為已超時sleepTime不為0,在10毫秒和sleepTime的值之間取較小的那個作為睡眠的毫秒數(shù)將當前線程睡眠,等待刷盤任務(wù)執(zhí)行
public class FlushDiskWatcher extends ServiceThread {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // 阻塞隊列,存放提交請求
    private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();

    @Override
    public String getServiceName() {
        return FlushDiskWatcher.class.getSimpleName();
    }

    @Override
    public void run() {
        // 如果未停止
        while (!isStopped()) {
            GroupCommitRequest request = null;
            try {
                // 從阻塞隊列中獲取提交請求
                request = commitRequests.take();
            } catch (InterruptedException e) {
                log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
                continue;
            }
            // 如果還未完成
            while (!request.future().isDone()) {
                long now = System.nanoTime();
                // 如果已經(jīng)超時
                if (now - request.getDeadLine() >= 0) {
                    // 設(shè)置刷盤結(jié)果為超時
                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    break;
                }
                // 避免頻繁的判斷,使用(截止時間 - 當前時間)/1000000 計算一個毫秒數(shù)
                long sleepTime = (request.getDeadLine() - now) / 1_000_000;
                // 在計算的毫秒數(shù)與10之間取最小的
                sleepTime = Math.min(10, sleepTime);
                // 如果sleepTime為0表示已經(jīng)到了截止時間
                if (sleepTime == 0) {
                    // 設(shè)置刷盤結(jié)果為超時
                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    break;
                }
                try {
                    // 睡眠等待刷盤任務(wù)的執(zhí)行
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    log.warn(
                            "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
                    break;
                }
            }
        }
    }
}
折疊 

異步刷盤

上面講解了同步刷盤,接下來去看下異步刷盤,首先會判斷是否使用了暫存池,如果未開啟調(diào)用flushCommitLogService的wakeup喚醒刷盤線程,否則使用commitLogService先將數(shù)據(jù)寫入到FileChannel,然后統(tǒng)一進行刷盤:

 public class CommitLog {
    private final FlushDiskWatcher flushDiskWatcher;
    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // 是否是同步刷盤
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // ...
        }
        // 如果是異步刷盤
        else {
            // 如果未使用暫存池
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                // 喚醒刷盤線程進行刷盤
                flushCommitLogService.wakeup();
            } else  {
                // 如果使用暫存池,使用commitLogService,先將數(shù)據(jù)寫入到FILECHANNEL,然后統(tǒng)一進行刷盤
                commitLogService.wakeup();
            }
            // 返回結(jié)果
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
}

在CommitLog的構(gòu)造函數(shù)中可以看到,commitLogService使用的是CommitRealTimeService進行實例化的,flushCommitLogService需要根據(jù)設(shè)置決定使用哪種類型進行實例化:

  • 如果是同步刷盤,使用GroupCommitService,由前面的同步刷盤可知,使用的就是GroupCommitService進行刷盤的。
  • 如果是異步刷盤,使用FlushRealTimeService。

所以接下來需要關(guān)注CommitRealTimeService和FlushRealTimeService:

「RocketMQ」消息的刷盤機制

 

public class CommitLog {    
    private final FlushCommitLogService flushCommitLogService;

    // 刷盤Service
    private final FlushCommitLogService commitLogService;

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        // 如果設(shè)置的同步刷盤
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // 使用GroupCommitService
            this.flushCommitLogService = new GroupCommitService();
        } else {
            // 使用FlushRealTimeService
            this.flushCommitLogService = new FlushRealTimeService();
        }
        // commitLogService
        this.commitLogService = new CommitRealTimeService();
    }
}

CommitRealTimeService

「RocketMQ」消息的刷盤機制

 

在開啟暫存池時,會使用CommitRealTimeService,它繼承了FlushCommitLogService,所以會實現(xiàn)run方法,處理邏輯如下:

  1. 從配置信息中獲取提交間隔每次提交的最少頁數(shù)兩次提交的最大間隔時間
  2. 如果當前時間大于上次提交時間+兩次提交的最大間隔時間,意味著已經(jīng)有比較長的一段時間沒有進行提交了,需要盡快刷盤,此時將每次提交的最少頁數(shù)設(shè)置為0不限制提交頁數(shù)
  3. 調(diào)用mappedFileQueue的commit方法進行提交,并返回提交的結(jié)果:如果結(jié)果為true表示未提交任何數(shù)據(jù)如果結(jié)果為false表示進行了數(shù)據(jù)提交,需要等待刷盤
  4. 判斷提交返回結(jié)果是否返回false,如果是調(diào)用flushCommitLogService的wakeup方法喚醒刷盤線程,進行刷盤
  5. 調(diào)用waitForRunning等待下一次提交處理
class CommitRealTimeService extends FlushCommitLogService {
        // 上次提交時間戳
        private long lastCommitTimestamp = 0;

        @Override
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            // 如果未停止
            while (!this.isStopped()) {
                // 獲取提交間隔
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
                // 一次提交的最少頁數(shù)
                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
                // 兩次提交的最大間隔時間
                int commitDataThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
                // 開始時間
                long begin = System.currentTimeMillis();
                // 如果當前時間大于上次提交時間+提交的最大間隔時間
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                    this.lastCommitTimestamp = begin; // 提交時間
                    commitDataLeastPages = 0;// 最少提交頁數(shù)設(shè)為0,表示不限制提交頁數(shù)
                }

                try {
                    // 提交
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    // 提交結(jié)束時間
                    long end = System.currentTimeMillis();
                    // 如果返回false表示提交了一部分數(shù)據(jù)但是還未進行刷盤
                    if (!result) {
                        // 再次更新提交時間戳
                        this.lastCommitTimestamp = end;
                        // 喚醒flush線程進行刷盤
                        flushCommitLogService.wakeup();
                    }

                    if (end - begin > 500) {
                        log.info("Commit data to file costs {} ms", end - begin);
                    }
                    // 等待下一次提交
                    this.waitForRunning(interval);
                } catch (Throwable e) {
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                }
            }

            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
    }
折疊 

提交

提交的方法在MappedFileQueue的commit方法中實現(xiàn),處理邏輯如下:

  1. 根據(jù)記錄的CommitLog文件提交位置的偏移量獲取映射文件,如果獲取不為空,調(diào)用MappedFile的commit方法進行提交,然后返回本次提交數(shù)據(jù)的偏移量
  2. 記錄本次提交的偏移量:文件的偏移量 + 提交數(shù)據(jù)的偏移量
  3. 判斷本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何數(shù)據(jù),返回結(jié)果置為true,否則表示提交了數(shù)據(jù),等待刷盤,返回結(jié)果為false
  4. 更新上一次提交偏移量committedWhere的值為本次的提交偏移量的值
public class MappedFileQueue {
    protected long flushedWhere = 0; // flush的位置偏移量
    private long committedWhere = 0; // 提交的位置偏移量
 
    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        // 根據(jù)提交位置的偏移量獲取映射文件
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            // 調(diào)用mappedFile的commit方法進行提交,返回提交數(shù)據(jù)的偏移量
            int offset = mappedFile.commit(commitLeastPages);
            // 記錄本次提交的偏移量:文件的偏移量 + 提交數(shù)據(jù)的偏移量
            long where = mappedFile.getFileFromOffset() + offset;
            // 設(shè)置返回結(jié)果,如果本次提交偏移量等于上一次的提交偏移量為true,表示什么也沒干,否則表示提交了數(shù)據(jù),等待刷盤
            result = where == this.committedWhere;
            // 更新上一次提交偏移量的值為本次的
            this.committedWhere = where;
        }
        return result;
    }
}

MappedFile

MappedFile中記錄CommitLog的寫入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,調(diào)用了isAbleToCommit判斷是否可以提交數(shù)據(jù),判斷的流程如下:

  1. 獲取提交數(shù)據(jù)的位置偏移量和寫入數(shù)據(jù)的位置偏移量
  2. 如果最少提交頁數(shù)大于0,計算本次寫入的頁數(shù)是否大于或等于最少提交頁數(shù)
  3. 本次寫入數(shù)據(jù)的頁數(shù)計算方法:寫入位置/頁大小 - flush位置/頁大小
  4. 如果以上條件都滿足,判斷寫入位置是否大于flush位置,如果大于表示有一部數(shù)據(jù)未flush可以進行提交

滿足提交條件后,就會調(diào)用commit0方法提交數(shù)據(jù),將數(shù)據(jù)寫入到fileChannel中:

public class MappedFile extends ReferenceResource {
    // 數(shù)據(jù)寫入位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 數(shù)據(jù)提交位置
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 數(shù)據(jù)flush位置
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
  
    // 提交數(shù)據(jù)
    public int commit(final int commitLeastPages) {
        // 如果writeBuffer為空
        if (writeBuffer == null) {
            // 不需要提交任何數(shù)據(jù)到,返回之前記錄的寫入位置
            return this.wrotePosition.get();
        }
        // 如果可以提交數(shù)據(jù)
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                // 提交數(shù)據(jù)
                commit0();
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }

        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }
        // 返回提交位置
        return this.committedPosition.get();
    }

    // 是否可以提交數(shù)據(jù)
    protected boolean isAbleToCommit(final int commitLeastPages) {
        // 獲取提交數(shù)據(jù)的位置偏移量
        int flush = this.committedPosition.get();
        // 獲取寫入數(shù)據(jù)的位置偏移量
        int write = this.wrotePosition.get();

        if (this.isFull()) {
            return true;
        }
        // 如果最少提交頁數(shù)大于0
        if (commitLeastPages > 0) {
            // 寫入位置/頁大小 - flush位置/頁大小 是否大于至少提交的頁數(shù)
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }
        // 判斷是否需要flush數(shù)據(jù)
        return write > flush;
    }
  
    protected void commit0() {
        // 獲取寫入位置
        int writePos = this.wrotePosition.get();
        // 獲取上次提交的位置
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - lastCommittedPosition > 0) {
            try {
                // 創(chuàng)建共享緩沖區(qū)
                ByteBuffer byteBuffer = writeBuffer.slice();
                // 設(shè)置上一次提交位置
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                // 數(shù)據(jù)寫入fileChannel
                this.fileChannel.write(byteBuffer);
                // 更新寫入的位置
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }
}
折疊 

FlushRealTimeService

如果未開啟暫存池,會直接使用FlushRealTimeService進行刷盤,當然如果開啟暫存池,寫入一批數(shù)據(jù)后,同樣會使用FlushRealTimeService進行刷盤,F(xiàn)lushRealTimeService同樣繼承了FlushCommitLogService,是用于執(zhí)行刷盤的線程,處理邏輯與提交刷盤數(shù)據(jù)邏輯相似,只不過不是提交數(shù)據(jù),而是調(diào)用flush方法將提交的數(shù)據(jù)刷入磁盤:

  1. 從配置信息中獲取flush間隔每次flush的最少頁數(shù)兩次flush的最大間隔時間
  2. 如果當前時間大于上次flush時間+兩次flush的最大間隔時間,意味著已經(jīng)有比較長的一段時間沒有進行flush,此時將每次flush的最少頁數(shù)設(shè)置為0不限制flush頁數(shù)
  3. 調(diào)用waitForRunning等待被喚醒
  4. 如果被喚醒,調(diào)用mappedFileQueue的flush方法進行刷盤
class FlushRealTimeService extends FlushCommitLogService {
        private long lastFlushTimestamp = 0; // 上一次flush的時間
        private long printTimes = 0;

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            // 如果未停止
            while (!this.isStopped()) {
                // 
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
                // 獲取flush間隔
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                // flush至少包含的頁數(shù)
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
                // 兩次flush的時間間隔
                int flushPhysicQueueThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

                boolean printFlushProgress = false;
                long currentTimeMillis = System.currentTimeMillis();
                // 如果當前毫秒數(shù) 大于上次flush時間 + 兩次flush之間的間隔
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                    this.lastFlushTimestamp = currentTimeMillis; // 更新flush時間
                    flushPhysicQueueLeastPages = 0; // flush至少包含的頁數(shù)置為0
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                try {
                    // 
                    if (flushCommitLogTimed) {
                        // 睡眠
                        Thread.sleep(interval);
                    } else {
                        // 等待flush被喚醒
                        this.waitForRunning(interval);
                    }
                    if (printFlushProgress) {
                        // 打印刷盤進程
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
                    // 進行刷盤
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // 如果服務(wù)停止,確保數(shù)據(jù)被刷盤
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                // 進行刷盤
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            this.printFlushProgress();
            CommitLog.log.info(this.getServiceName() + " service end");
        }

折疊 

刷盤

刷盤的方法在MappedFileQueue的flush方法中實現(xiàn),處理邏輯如下:

  1. 根據(jù) flush的位置偏移量獲取映射文件
  2. 調(diào)用mappedFile的flush方法進行刷盤,并返回刷盤后的位置偏移量
  3. 計算最新的flush偏移量
  4. 更新flushedWhere的值為最新的flush偏移量
public class MappedFileQueue {
    protected long flushedWhere = 0; // flush的位置偏移量
    private long committedWhere = 0; // 提交的位置偏移量
  
    // flush刷盤
    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        // 獲取flush的位置偏移量映射文件
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            // 獲取時間戳
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            // 調(diào)用MappedFile的flush方法進行刷盤,返回刷盤后的偏移量
            int offset = mappedFile.flush(flushLeastPages);
            // 計算最新的flush偏移量
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            // 更新flush偏移量
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
        // 返回flush的偏移量
        return result;
    }
}

flush的邏輯也與commit方法的邏輯類似:

  1. 調(diào)用isAbleToFlush判斷是否滿足刷盤條件,獲取上次flush位置偏移量和當前寫入位置偏移量進行如下校驗:
  2. 文件是否已寫滿,即文件大小是否與寫入數(shù)據(jù)位置相等,如果相等說明文件已經(jīng)寫滿需要執(zhí)行刷盤,滿足刷盤條件
  3. 如果最少flush頁數(shù)大于0,計算本次flush的頁數(shù)是否大于或等于最少flush頁數(shù),如果滿足可以進行刷盤
  4. 本次flush數(shù)據(jù)的頁數(shù)計算方法:寫入位置/頁大小 - flush位置/頁大小
  5. 如果寫入位置偏移量是否大于flush位置偏移量,如果大于表示有數(shù)據(jù)未進行刷盤,滿足刷盤條件
  6. 調(diào)用fileChannel的force或者mappedByteBuffer的force方法進行刷盤
  7. 記錄本次flush的位置,并作為結(jié)果返回
public class MappedFile extends ReferenceResource {
  
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
   
    /**
     * 進行刷盤并返回flush后的偏移量
     */
    public int flush(final int flushLeastPages) {
        // 是否可以刷盤
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
                try {
                    // 如果writeBuffer不為空
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        // 將數(shù)據(jù)刷到硬盤
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }
                // 記錄flush位置
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        // 返回flush位置
        return this.getFlushedPosition();
    }
    
    // 是否可以刷盤
    private boolean isAbleToFlush(final int flushLeastPages) {
        // 獲取上次flush位置
        int flush = this.flushedPosition.get();
        // 寫入位置偏移量
        int write = getReadPosition();
        if (this.isFull()) {
            return true;
        }
        // 如果flush的頁數(shù)大于0,校驗本次flush的頁數(shù)是否滿足條件
        if (flushLeastPages > 0) {
            // 本次flush的頁數(shù):寫入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPages
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
        } 
        // 寫入位置偏移量是否大于flush位置偏移量
        return write > flush;
    }
    
    // 文件是否已寫滿
    public boolean isFull() {
        // 文件大小是否與寫入數(shù)據(jù)位置相等
        return this.fileSize == this.wrotePosition.get();
    }
  
    /**
     * 返回當前有效數(shù)據(jù)的位置
     */
    public int getReadPosition() {
        // 如果writeBuffer為空使用寫入位置,否則使用提交位置
        return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
    }
}
折疊 
「RocketMQ」消息的刷盤機制

 

分享到:
標簽:RocketMQ
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定