前言
RocketMQ作為一款優(yōu)秀的開(kāi)源消息中間件,實(shí)現(xiàn)了文件的高性能存儲(chǔ)和讀取,在眾多消息中間件中脫穎而出,其文件模塊設(shè)計(jì)思想很值得我們學(xué)習(xí)和借鑒。因此很多開(kāi)發(fā)者在使用的時(shí)候,也開(kāi)始研究其文件存儲(chǔ)的實(shí)現(xiàn)原理,但是在學(xué)習(xí)過(guò)程中,由于自身知識(shí)儲(chǔ)備不足,往往只能了解其基本原理和整體架構(gòu),對(duì)于具體是怎么實(shí)現(xiàn)是,用到了什么技術(shù),往往是一知半解。目前網(wǎng)上有很多介紹RocketMQ原理和源碼的文章,但是很多都是講解整體架構(gòu),對(duì)源代碼的分析也僅僅是停留在代碼注釋層面,導(dǎo)致對(duì)整體和細(xì)節(jié)的把握不能統(tǒng)一, 給人一種"不識(shí)廬山真面目,只緣身在此山中"的感覺(jué)。
筆者針對(duì)開(kāi)發(fā)者在研究RocketMQ的過(guò)程中遇到的困惑,基于對(duì)RocketMQ的文件存儲(chǔ)原理和源碼研究,結(jié)合JAVA NIO的文件讀寫(xiě),自己動(dòng)手實(shí)現(xiàn)了一個(gè)簡(jiǎn)化版本的RocketMQ文件系統(tǒng),分享出來(lái),希望能抽絲剝繭,幫助開(kāi)發(fā)者從本質(zhì)上理解RocketMQ文件存儲(chǔ)的原理,起到拋磚引玉,舉一反三的作用。
本文不是一篇介紹RocketMQ存儲(chǔ)基本原理的文章,本文假設(shè)讀者對(duì)RocketMQ的CommitLog,ConsumeQueue,IndexFile已經(jīng)有一定的了解,熟悉java NIO文件讀寫(xiě)。本文適合對(duì)RocketMQ的文件存儲(chǔ)原理有一定的了解,并且希望進(jìn)一步了解RocketMQ是如何通過(guò)java NIO實(shí)現(xiàn)的讀者。
核心原理
在向commitLog文件寫(xiě)入消息的時(shí)候,需要記錄該條消息在commitLog文件的偏移量offset(消息在commitLog的起始字節(jié)數(shù)),讀取的時(shí)候根據(jù)offset讀取。RocketMQ保存offset的文件為consumeQueue 和indexFile。
RockeetMQ文件讀寫(xiě)流程
RocketMQ文件存儲(chǔ)示意圖
RocketMQ文件邏輯存儲(chǔ)結(jié)構(gòu)
RocketMQ文件offset查找示意圖
CommitLog讀寫(xiě)
commitLog文件寫(xiě)入的是完整的消息,長(zhǎng)度不固定,因此讀取的時(shí)候只能根據(jù)文件存儲(chǔ)偏移量offset讀取。實(shí)際上offset保存在consumeQueue,indexFile文件中。
consumeQueue讀寫(xiě)
consumeQueue在消費(fèi)方拉取消息的時(shí)候讀取,讀取原理比較簡(jiǎn)單。
consumeQueue每條數(shù)據(jù)固定長(zhǎng)度是20(8:offset+4:msgLen+8:tagCode),順序?qū)懭耄繉?xiě)入一條消息,寫(xiě)入位置postition+20。讀取的時(shí)候按消息序號(hào)index(第幾條消息)讀取。
假設(shè)消費(fèi)方要消費(fèi)消息序號(hào)index=2的消息(第2條消息),過(guò)程如下:
1.定位consumeQueue文件,然后讀位置postition定位到40(2*20),讀取數(shù)據(jù)。
2.根據(jù)1讀取 的數(shù)據(jù)取到offset值(存儲(chǔ)在consumeQueue的偏移量)。
3.根據(jù)2得到的offset值,定位commitLog文件,然后讀取commitLog上的整條消息。
參見(jiàn)RocketMQ文件offset查找示意圖
indexFile讀寫(xiě)
indexFile由indexHead(長(zhǎng)度40),500W個(gè)hash槽位(每個(gè)槽位長(zhǎng)度固定4),2000W個(gè)indexData組成。
indexFile是為了方便通過(guò)messageId讀取消息而設(shè)計(jì)的,因此需要將messageId和消息序號(hào)index做一層映射,將messageId取模后得到槽位下標(biāo)(第幾個(gè)槽位),然后將當(dāng)前messageId對(duì)應(yīng)的消息index(消息序號(hào))放到對(duì)應(yīng)的槽位,并將數(shù)據(jù)順序保存到indexFile的indexData部分。
寫(xiě)入過(guò)程:
1.hash(messageId)%500W得到槽位(slot)的下標(biāo)slot_index(第幾個(gè)槽位,槽位長(zhǎng)度固定4),
然后將消息序號(hào)index存放到對(duì)應(yīng)的槽位(為簡(jiǎn)化設(shè)計(jì),暫不考慮hash沖突的情況)。
2.存儲(chǔ)indexData數(shù)據(jù),起始存儲(chǔ)位置postition 為
indexDataOffset = 40(文件頭長(zhǎng)度) + 500W * 4+(index-1)*20
讀取過(guò)程:
1.hash(messageId) % 500W定位到槽位的下標(biāo)slot_index(第幾個(gè)槽位)。
2.然后根據(jù)槽位下標(biāo)計(jì)算槽位的偏移量slot_offset(每個(gè)槽位的固定長(zhǎng)度 是4)。
slot_offset = 40(文件頭長(zhǎng)度) + slot_index * 4。
3.然后根據(jù)slot_offset獲取到槽位上存儲(chǔ)的消息的序號(hào)index。
4.根據(jù)消息的index計(jì)算該條消息存儲(chǔ)在indexFile的indexData部分的偏移量indexDataOffset,
indexDataOffset = 40(文件頭長(zhǎng)度) + 500W * 4+( index - 1 ) * 20
5.根據(jù)indexDataOffset讀取indexFile的IndexData部分,然后獲取commitLog的offset,即可讀取到實(shí)際的消息。
參見(jiàn)RocketMQ文件offset查找示意圖
代碼實(shí)現(xiàn)
1.手動(dòng)生成10個(gè)消息,并創(chuàng)建commitLog文件,consumeQueue,indexFile文件
public class CommitLogWriteTest {
private static Long commitLogOffset = 0L;//8byte(commitlog offset)
private static List<ConsumerQueueData> consumerQueueDatas = new ArrayList<>();
private static List<IndexFileItemData> indexFileItemDatas = new ArrayList<>();
private static int MESSAGE_COUNT = 10;
public static void main(String[] args) throws IOException {
createCommitLog();
createConsumerQueue();
createIndexFile();
}
private static void createCommitLog() throws IOException {
System.out.println("");
System.out.println("commitLog file create!" );
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MAppedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
fileChannel.close();
Random random = new Random();
int count = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String topic = "Topic-test";
String msgId = UUID.randomUUID().toString();
String msgBody = "消息內(nèi)容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48) + 1);//
long queueOffset = i;//索引偏移量
String transactionId = UUID.randomUUID().toString();
/* 數(shù)據(jù)格式,位置固定
int totalSize;//消息長(zhǎng)度
String msgId;
String topic;
long queueOffset;//索引偏移量
long bodySize;//消息長(zhǎng)度
byte[] body;//消息內(nèi)容
String transactionId;
long commitLogOffset;//從第一個(gè)文件開(kāi)始算的偏移量
*/
int msgTotalLen = 8 //msgTotalLen field
+ 64 //msgId field長(zhǎng)度
+ 64 //topic field長(zhǎng)度
+ 8 //索引偏移量field長(zhǎng)度
+ 8 //消息長(zhǎng)度f(wàn)ield長(zhǎng)度
+ msgBody.getBytes(StandardCharsets.UTF_8).length //field
+ 64 //transactionId field長(zhǎng)度
+ 64 //commitLogOffset field長(zhǎng)度;
;
// 定位寫(xiě)入文件的起始位置
//如果3個(gè)消息長(zhǎng)度分別是100,200,350,則偏移量分別是0,100,300
mappedByteBuffer.position(Integer.valueOf(commitLogOffset + ""));
mappedByteBuffer.putLong(msgTotalLen);//msgTotalLen
mappedByteBuffer.put(getBytes(msgId, 64));//msgId
mappedByteBuffer.put(getBytes(topic, 64));//topic,定長(zhǎng)64
mappedByteBuffer.putLong(queueOffset);//索引偏移量
mappedByteBuffer.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
mappedByteBuffer.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
mappedByteBuffer.put(getBytes(transactionId, 64));
mappedByteBuffer.putLong(commitLogOffset);//commitLogOffset
System.out.println("寫(xiě)入消息,第:" + i + "次");
System.out.println("msgTotalLen:" + msgTotalLen);
System.out.println("msgId:" + msgId);
System.out.println("topic:" + topic);
System.out.println("msgBody:" + msgBody);
System.out.println("transactionId:" + transactionId);
System.out.println("commitLogOffset:" + commitLogOffset);
ConsumerQueueData consumerQueueData = new ConsumerQueueData();
consumerQueueData.setOffset(commitLogOffset);
consumerQueueData.setMsgLength(msgTotalLen);
consumerQueueData.setTagCode(100L);
//準(zhǔn)備生成consumeQueue文件
consumerQueueDatas.add(consumerQueueData);
IndexFileItemData indexFileItemData = new IndexFileItemData();
indexFileItemData.setKeyHash(msgId.hashCode());
indexFileItemData.setMessageId(msgId);
indexFileItemData.setPhyOffset(commitLogOffset);
//準(zhǔn)備生成indexFile文件
indexFileItemDatas.add(indexFileItemData);
mappedByteBuffer.force();
commitLogOffset = msgTotalLen + commitLogOffset;
count++;
}
System.out.println("commitLog數(shù)據(jù)保存完成,totalSize:" + count);
}
public static void createConsumerQueue() throws IOException {
System.out.println("");
System.out.println("ConsumerQueue file create!" );
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fileChannel.close();
int count = 0;
for (int i = 0; i < consumerQueueDatas.size(); i++) {
ConsumerQueueData consumerQueueData = consumerQueueDatas.get(i);
//指定寫(xiě)入位置
mappedByteBuffer.position(i * 20);
mappedByteBuffer.putLong(consumerQueueData.getOffset());//8byte(commitlog offset)
mappedByteBuffer.putInt(consumerQueueData.getMsgLength());//4byte (msgLength)
mappedByteBuffer.putLong(consumerQueueData.getTagCode());//8byte (tagCode)
count++;
System.out.println("consumerQueue數(shù)據(jù)寫(xiě)入完成:" + JSON.toJSONString(consumerQueueData));
mappedByteBuffer.force();
}
System.out.println("ConsumerQueue數(shù)據(jù)保存完成count:" + count);
}
public static void createIndexFile() throws IOException {
System.out.println("");
System.out.println("IndexFile file create!" );
//文件場(chǎng)創(chuàng)建時(shí)間,在寫(xiě)第一條消息的時(shí)候創(chuàng)建
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
ByteBuffer headerByteBuffer = mappedByteBuffer.slice();
long firstDataTime = System.currentTimeMillis();
fileChannel.close();
//開(kāi)始寫(xiě)hash槽,從頭部后寫(xiě)入
/* 已經(jīng)填充有index的slot數(shù)量
(并不是每個(gè)slot槽下都掛載有index索引單元,這 里統(tǒng)計(jì)的是所有掛載了index索引單元的slot槽的數(shù)量,hash沖突)*/
int hashSlotCount = 0;
/* 已該indexFile中包含的索引單元個(gè)數(shù)(統(tǒng)計(jì)出當(dāng)前indexFile中所有slot槽下掛載的所有index索引單元的數(shù)量之和),
如果沒(méi)有hash沖突,hashSlotCount = indexCount*/
int indexCount = 0;
//假設(shè)建立100個(gè)槽位(總長(zhǎng)度400)
int soltNum = 100;
for (int i = 0; i < MESSAGE_COUNT; i++) {
IndexFileItemData indexFileItemData = indexFileItemDatas.get(i);
int keyHash = indexFileItemData.getKeyHash();
//取模,計(jì)算第幾個(gè)槽位
int slotPos = keyHash % 100 > 0 ? keyHash % 100 : -1 * (keyHash % 100);
// slot存放的文件偏移量(字節(jié)長(zhǎng)度)
int absSlotPos = 40 + slotPos * 4;
// 存儲(chǔ)實(shí)際數(shù)據(jù)的文件偏移量(字節(jié)長(zhǎng)度)
int absIndexPos =
40 + soltNum * 4
+ indexCount * 20;
//將indexCount存到對(duì)應(yīng)的hash槽
mappedByteBuffer.putInt(absSlotPos, indexCount);
//寫(xiě)入數(shù)據(jù)(IndecFile的實(shí)際數(shù)據(jù)部分)
mappedByteBuffer.putInt(absIndexPos, indexFileItemData.getKeyHash());//8byte msg hashcode
mappedByteBuffer.putLong(absIndexPos + 4, indexFileItemData.getPhyOffset());//8byte msg hashcode
mappedByteBuffer.putInt(absIndexPos + 4 + 8, Integer.valueOf((System.currentTimeMillis() - firstDataTime) + ""));//8byte (timeDiff)
mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, 0);//8byte (preIndex)暫不考慮hash沖突的情況
//模擬最后一個(gè)文件,寫(xiě)入header
if (i == 0) {
//該indexFile中第一條消息的存儲(chǔ)時(shí)間
headerByteBuffer.putLong(0, firstDataTime);
//該indexFile種第一條消息在commitlog種的偏移量commitlog offset
mappedByteBuffer.putLong(16, indexFileItemData.getPhyOffset());
}
//模擬第一個(gè)文件,寫(xiě)入header
if (i == MESSAGE_COUNT - 1) {
//該indexFile種最后一條消息存儲(chǔ)時(shí)間
headerByteBuffer.putLong(8, System.currentTimeMillis());
//該indexFile中最后一條消息在commitlog中的偏移量commitlog offset
headerByteBuffer.putLong(24, indexFileItemData.getPhyOffset());
}
//已經(jīng)填充有index的slot數(shù)量
headerByteBuffer.putInt(32, hashSlotCount + 1);
//該indexFile中包含的索引單元個(gè)數(shù)
headerByteBuffer.putInt(36, indexCount + 1);
mappedByteBuffer.force();
System.out.println("msgId:" + indexFileItemData.getMessageId() + ",keyHash:" + keyHash + ",保存槽位為" + slotPos + "的數(shù)據(jù),absSlotPos=" + absSlotPos + ",值index=" + indexCount + ",絕對(duì)位置:" + absIndexPos + ",commit-phyOffset:" + indexFileItemData.getPhyOffset());
indexCount++;
hashSlotCount++;
}
}
//將變長(zhǎng)字符串定長(zhǎng)byte[],方便讀取
private static byte[] getBytes(String s, int length) {
int fixLength = length - s.getBytes().length;
if (s.getBytes().length < length) {
byte[] S_bytes = new byte[length];
System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length);
for (int x = length - fixLength; x < length; x++) {
S_bytes[x] = 0x00;
}
return S_bytes;
}
return s.getBytes(StandardCharsets.UTF_8);
}
}
運(yùn)行結(jié)果:
commitLog file create!
寫(xiě)入消息,第:0次
msgTotalLen:338
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05
topic:Topic-test
msgBody:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:874605e6-69d2-4301-a65e-01e63de75a4d
commitLogOffset:0
寫(xiě)入消息,第:1次
msgTotalLen:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e
topic:Topic-test
msgBody:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:b991a3e9-66fc-4a54-97fc-1492f7f54d3c
commitLogOffset:338
寫(xiě)入消息,第:2次
msgTotalLen:296
msgId:a0c7c833-9811-4f17-800b-847766aef7dd
topic:Topic-test
msgBody:消息內(nèi)容msgm
transactionId:9a836d21-704f-46ae-926c-b7933efe06a5
commitLogOffset:676
寫(xiě)入消息,第:3次
msgTotalLen:299
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356
topic:Topic-test
msgBody:消息內(nèi)容msgmsgm
transactionId:19506313-c7ae-4282-8bc7-1f5ca7735c44
commitLogOffset:972
寫(xiě)入消息,第:4次
msgTotalLen:306
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5
topic:Topic-test
msgBody:消息內(nèi)容msgmsgmsgmsgms
transactionId:09f3b762-159e-4486-8820-0bce0ef7972d
commitLogOffset:1271
寫(xiě)入消息,第:5次
msgTotalLen:313
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1
topic:Topic-test
msgBody:消息內(nèi)容msgmsgmsgmsgmsgmsgmsg
transactionId:42dce613-6aaf-466b-b185-02a3f7917579
commitLogOffset:1577
寫(xiě)入消息,第:6次
msgTotalLen:321
msgId:05be27f8-fb7a-4662-904f-2263e8899086
topic:Topic-test
msgBody:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:6c7db927-911c-4d19-a240-a951fad957bd
commitLogOffset:1890
寫(xiě)入消息,第:7次
msgTotalLen:318
msgId:9a508d90-30f6-4a25-812f-25d750736afe
topic:Topic-test
msgBody:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgms
transactionId:0bbc5e92-0a78-4699-a7a4-408e7bd3b897
commitLogOffset:2211
寫(xiě)入消息,第:8次
msgTotalLen:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be
topic:Topic-test
msgBody:消息內(nèi)容msgmsgmsgmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724
commitLogOffset:2529
寫(xiě)入消息,第:9次
msgTotalLen:329
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489
topic:Topic-test
msgBody:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:e9078205-15be-42b1-ad7e-55b9f5e229eb
commitLogOffset:2834
commitLog數(shù)據(jù)保存完成,totalSize:10
ConsumerQueue file create!
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":338,"offset":0,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":338,"offset":338,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":296,"offset":676,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":299,"offset":972,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":306,"offset":1271,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":313,"offset":1577,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":321,"offset":1890,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":318,"offset":2211,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":305,"offset":2529,"tagCode":100}
consumerQueue數(shù)據(jù)寫(xiě)入完成:{"msgLength":329,"offset":2834,"tagCode":100}
ConsumerQueue數(shù)據(jù)保存完成count:10
IndexFile file create!
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05,keyHash:-358470777,保存槽位為77的數(shù)據(jù),absSlotPos=348,值index=0,絕對(duì)位置:440,commit-phyOffset:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e,keyHash:466366793,保存槽位為93的數(shù)據(jù),absSlotPos=412,值index=1,絕對(duì)位置:460,commit-phyOffset:676
msgId:a0c7c833-9811-4f17-800b-847766aef7dd,keyHash:1237522456,保存槽位為56的數(shù)據(jù),absSlotPos=264,值index=2,絕對(duì)位置:480,commit-phyOffset:972
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356,keyHash:-1115509881,保存槽位為81的數(shù)據(jù),absSlotPos=364,值index=3,絕對(duì)位置:500,commit-phyOffset:1271
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5,keyHash:1219778974,保存槽位為74的數(shù)據(jù),absSlotPos=336,值index=4,絕對(duì)位置:520,commit-phyOffset:1577
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1,keyHash:460184183,保存槽位為83的數(shù)據(jù),absSlotPos=372,值index=5,絕對(duì)位置:540,commit-phyOffset:1890
msgId:05be27f8-fb7a-4662-904f-2263e8899086,keyHash:-339624012,保存槽位為12的數(shù)據(jù),absSlotPos=88,值index=6,絕對(duì)位置:560,commit-phyOffset:2211
msgId:9a508d90-30f6-4a25-812f-25d750736afe,keyHash:403329587,保存槽位為87的數(shù)據(jù),absSlotPos=388,值index=7,絕對(duì)位置:580,commit-phyOffset:2529
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be,keyHash:-1569335572,保存槽位為72的數(shù)據(jù),absSlotPos=328,值index=8,絕對(duì)位置:600,commit-phyOffset:2834
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489,keyHash:597856342,保存槽位為42的數(shù)據(jù),absSlotPos=208,值index=9,絕對(duì)位置:620,commit-phyOffset:3163
2.讀取consumeQueue文件,并根據(jù)offset從commitLog讀取一條完整的消息
public class ConsumeQueueMessageReadTest {
public static MappedByteBuffer mappedByteBuffer = null;
private static int MESSAGE_COUNT = 10;
public static void main(String[] args) throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
fileChannel.close();
//根據(jù)索引下標(biāo)讀取索引,實(shí)際情況是用戶(hù)消費(fèi)的最新點(diǎn)位(for循環(huán)的i值),
//存在在broker的偏移量文件中
int index = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
mappedByteBuffer.position(i * 20);
long commitlogOffset = mappedByteBuffer.getLong();
// System.out.println(commitlogOffset);
long msgLen = mappedByteBuffer.getInt();
Long tag = mappedByteBuffer.getLong();
//System.out.println("======讀取到consumerQueue,commitlogOffset:"+commitlogOffset+",msgLen :"+msgLen+"===");
//根據(jù)偏移量讀取CommitLog
System.out.println("=================commitlog讀取第:"+index+"消息,偏移量為" + commitlogOffset + "===================");
readCommitLogByOffset(Integer.valueOf(commitlogOffset + ""));
index ++;
}
}
public static MappedByteBuffer initFileChannel() throws IOException {
if (mappedByteBuffer == null) {
FileChannel commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
commitLogfileChannel.close();
}
return mappedByteBuffer;
}
/*
*
* 根據(jù)偏移量讀取commitLog
* */
public static void readCommitLogByOffset(int offset) throws IOException {
/* 存放順序,讀到時(shí)候保持順序一致
b.putLong(totalSize);//totalSize
b.put(getBytes(msgId, 64));//msgId
b.put(getBytes(topic, 64));//topic,定長(zhǎng)64
b.putLong(queueOffset);//索引偏移量
b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
b.put(getBytes(transactionId, 64));
b.putLong(commitLogOffset);//commitLogOffset
*/
MappedByteBuffer mappedByteBuffer = initFileChannel();
mappedByteBuffer.position(offset);
long totalSize = mappedByteBuffer.getLong();//消息長(zhǎng)度
byte[] msgIdByte = new byte[64];//uuid 固定是64
mappedByteBuffer.get(msgIdByte);
byte[] topicByte = new byte[64];// 固定是64
mappedByteBuffer.get(topicByte);
long queueOffset = mappedByteBuffer.getLong();
Long bodySize = mappedByteBuffer.getLong();
int bSize = Integer.valueOf(bodySize + "");
byte[] bodyByte = new byte[bSize];//bodySize 長(zhǎng)度不固定
mappedByteBuffer.get(bodyByte);
byte[] transactionIdByte = new byte[64];//uuid 固定是64
mappedByteBuffer.get(transactionIdByte);
long commitLogOffset = mappedByteBuffer.getLong();//偏移量
System.out.println("totalSize:" + totalSize);
System.out.println("msgId:" + new String(msgIdByte));
System.out.println("topic:" + new String(topicByte));
System.out.println("queueOffset:" + queueOffset);
System.out.println("bodySize:" + bodySize);
System.out.println("body:" + new String(bodyByte));
System.out.println("transactionId:" + new String(transactionIdByte));
System.out.println("commitLogOffset:" + commitLogOffset);
}
}
運(yùn)行結(jié)果:
=================commitlog讀取第:0消息,偏移量為0===================
totalSize:338
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05
topic:Topic-test
queueOffset:0
bodySize:58
body:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:874605e6-69d2-4301-a65e-01e63de75a4d
commitLogOffset:0
=================commitlog讀取第:1消息,偏移量為338===================
totalSize:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e
topic:Topic-test
queueOffset:1
bodySize:58
body:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:b991a3e9-66fc-4a54-97fc-1492f7f54d3c
commitLogOffset:338
=================commitlog讀取第:2消息,偏移量為676===================
totalSize:296
msgId:a0c7c833-9811-4f17-800b-847766aef7dd
topic:Topic-test
queueOffset:2
bodySize:16
body:消息內(nèi)容msgm
transactionId:9a836d21-704f-46ae-926c-b7933efe06a5
commitLogOffset:676
=================commitlog讀取第:3消息,偏移量為972===================
totalSize:299
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356
topic:Topic-test
queueOffset:3
bodySize:19
body:消息內(nèi)容msgmsgm
transactionId:19506313-c7ae-4282-8bc7-1f5ca7735c44
commitLogOffset:972
=================commitlog讀取第:4消息,偏移量為1271===================
totalSize:306
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5
topic:Topic-test
queueOffset:4
bodySize:26
body:消息內(nèi)容msgmsgmsgmsgms
transactionId:09f3b762-159e-4486-8820-0bce0ef7972d
commitLogOffset:1271
=================commitlog讀取第:5消息,偏移量為1577===================
totalSize:313
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1
topic:Topic-test
queueOffset:5
bodySize:33
body:消息內(nèi)容msgmsgmsgmsgmsgmsgmsg
transactionId:42dce613-6aaf-466b-b185-02a3f7917579
commitLogOffset:1577
=================commitlog讀取第:6消息,偏移量為1890===================
totalSize:321
msgId:05be27f8-fb7a-4662-904f-2263e8899086
topic:Topic-test
queueOffset:6
bodySize:41
body:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:6c7db927-911c-4d19-a240-a951fad957bd
commitLogOffset:1890
=================commitlog讀取第:7消息,偏移量為2211===================
totalSize:318
msgId:9a508d90-30f6-4a25-812f-25d750736afe
topic:Topic-test
queueOffset:7
bodySize:38
body:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgms
transactionId:0bbc5e92-0a78-4699-a7a4-408e7bd3b897
commitLogOffset:2211
=================commitlog讀取第:8消息,偏移量為2529===================
totalSize:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be
topic:Topic-test
queueOffsmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724
commitLogOffset:2529
=================commitlog讀取第:9消息,偏移量為2834===================
totalSize:329
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489
topic:Topic-test
queueOffset:9
bodySize:49
body:消息內(nèi)容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:e9078205-15be-42b1-ad7e-55b9f5e229eb
commitLogOffset:2834
3.根據(jù)messageId讀取indexFile,然后根據(jù)偏移量從CommitLog讀取一條完整的消息
public class IndexFileMessageReadTest {
public static MappedByteBuffer mappedByteBuffer = null;
public static void main(String[] args) throws IOException {
String msgId = "8b78474f-b28a-4442-99a0-6f7883f0302b";
readByMessageId(msgId);
}
private static void readByMessageId(String messageId) throws IOException {
FileChannel indexFileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer indexMappedByteBuffer = indexFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
indexFileChannel.close();
System.out.println("============get indexFile header===============");
System.out.println("beginTimestampIndex:" + indexMappedByteBuffer.getLong());
System.out.println("endTimestampIndex:" + indexMappedByteBuffer.getLong());
System.out.println("beginPhyoffsetIndex:" + indexMappedByteBuffer.getLong());
System.out.println("endPhyoffsetIndex:" + indexMappedByteBuffer.getLong());
System.out.println("hashSlotcountIndex:" + indexMappedByteBuffer.getInt());
System.out.println("indexCountIndex:" + indexMappedByteBuffer.getInt());
System.out.println("");
int keyHash = messageId.hashCode();
//取模,計(jì)算第幾個(gè)槽位
int slotPos = keyHash % 100 > 0 ? keyHash % 100 : -1 * (keyHash % 100);
System.out.println("messageId:" + messageId + ",取模為:" + slotPos);
// slot的文件偏移量(字節(jié)長(zhǎng)度)
int absSlotPos = 40 + slotPos * 4;
System.out.println("哈希槽的字節(jié)數(shù)組位置:(40+" + slotPos + "*4)=" + absSlotPos);
//獲取hash槽上存取的件索引,第幾個(gè)文件
int index = indexMappedByteBuffer.getInt(absSlotPos);
//計(jì)算數(shù)據(jù)需要存儲(chǔ)的文件偏移量(字節(jié)長(zhǎng)度)
int absIndexPos =
40 + 100 * 4
+ index * 20;
System.out.println("第幾個(gè)文件index=" + index + ",實(shí)際存儲(chǔ)數(shù)據(jù)的字節(jié)數(shù)組位置:(40 + 100 * 4+index *20)=" + absIndexPos);
long keyHash1 = indexMappedByteBuffer.getInt(absIndexPos);
long pyhOffset = indexMappedByteBuffer.getLong(absIndexPos + 4);
int timeDiff = indexMappedByteBuffer.getInt(absIndexPos + 4 + 8);
int preIndexNo = indexMappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
System.out.println("從index獲取到的commitLog偏移量為:" + pyhOffset);
System.out.println("");
readCommitLogByOffset((int) pyhOffset);
}
public static MappedByteBuffer initFileChannel() throws IOException {
if (mappedByteBuffer == null) {
FileChannel commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
commitLogfileChannel.close();
}
return mappedByteBuffer;
}
/*
*
* 根據(jù)偏移量讀取CcommitLog
* */
public static void readCommitLogByOffset(int offset) throws IOException {
/*b.putLong(totalSize);//totalSize
b.put(getBytes(msgId, 64));//msgId
b.put(getBytes(topic, 64));//topic,定長(zhǎng)64
b.putLong(queueOffset);//索引偏移量
b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
b.put(getBytes(transactionId, 64));
b.putLong(commitLogOffset);//commitLogOffset
*/
System.out.println("=================commitlog讀取偏移量為" + offset + "的消息===================");
MappedByteBuffer mappedByteBuffer = initFileChannel();
mappedByteBuffer.position(offset);
long totalSize = mappedByteBuffer.getLong();//消息長(zhǎng)度
byte[] msgIdByte = new byte[64];//uuid 固定是64
mappedByteBuffer.get(msgIdByte);
byte[] topicByte = new byte[64];// 固定是64
mappedByteBuffer.get(topicByte);
long queueOffset = mappedByteBuffer.getLong();
Long bodySize = mappedByteBuffer.getLong();
int bSize = Integer.valueOf(bodySize + "");
byte[] bodyByte = new byte[bSize];//bodySize 長(zhǎng)度不固定
mappedByteBuffer.get(bodyByte);
byte[] transactionIdByte = new byte[64];//uuid 固定是64
mappedByteBuffer.get(transactionIdByte);
long commitLogOffset = mappedByteBuffer.getLong();//偏移量
System.out.println("totalSize:" + totalSize);
System.out.println("msgId:" + new String(msgIdByte));
System.out.println("topic:" + new String(topicByte));
System.out.println("queueOffset:" + queueOffset);
System.out.println("bodySize:" + bodySize);
System.out.println("body:" + new String(bodyByte));
System.out.println("transactionId:" + new String(transactionIdByte));
System.out.println("commitLogOffset:" + commitLogOffset);
}
public static byte[] toByteArray(long number) {
byte length = Long.BYTES;
byte[] bytes = new byte[length];
for (byte i = 0; i < length; i++) {
bytes[length - 1 - i] = (byte) number;
number >>= 8;
}
return bytes;
}
}
運(yùn)行結(jié)果:
============get indexFile header===============
beginTimestampIndex:1669602898169
endTimestampIndex:1669602898176
beginPhyoffsetIndex:338
endPhyoffsetIndex:3163
hashSlotcountIndex:10
indexCountIndex:10
messageId:9a508d90-30f6-4a25-812f-25d750736afe,取模為:87
哈希槽的字節(jié)數(shù)組位置:(40+87*4)=388
第幾個(gè)文件index=7,實(shí)際存儲(chǔ)數(shù)據(jù)的字節(jié)數(shù)組位置:(40 + 100 * 4+index *20)=580
從index獲取到的commitLog偏移量為:2529
=================commitlog讀取偏移量為2529的消息===================
totalSize:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be
topic:Topic-test
queueOffset:8
bodySize:25
body:消息內(nèi)容msgmsgmsgmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724
commitLogOffset:2529
結(jié)語(yǔ)
本文基于java NIO實(shí)現(xiàn)了RocketMQ的文件系統(tǒng)的最精簡(jiǎn)的功能,希望能幫助開(kāi)發(fā)人員加深對(duì)RocketMQ文件系統(tǒng)底層實(shí)現(xiàn)原理的了解,并能熟練運(yùn)用Java NIO進(jìn)行文件讀寫(xiě)。歡迎一起交流討論,不足的地方歡迎指正。