前言
我們可以從JAVA.io.InputStream類中看到,抽象出一個read方法,用來讀取已經打開的InputStream實例中的字節,每次調用read方法,會讀取一個字節數據,該方法抽象定義,如下所示:public abstract int read() throws IOException;Hadoop的DFSClient.DFSInputStream類實現了該抽象邏輯,如果我們清楚了如何從HDFS中讀取一個文件的一個block的一個字節的原理,更加抽象的頂層只需要迭代即可獲取到該文件的全部數據。
從HDFS讀文件過程分析:獲取文件對應的Block列表中,我們已經獲取到一個文件對應的Block列表信息,打開一個文件,接下來就要讀取實際的物理塊數據,我們從下面的幾個方面來詳細說明讀取數據的過程。
Client從Datanode讀取文件的一個字節
下面,我們通過分析DFSClient.DFSInputStream中實現的代碼,讀取HDFS上文件的內容。首先從下面的方法開始:
@Override
public synchronized int read() throws IOException {
int ret = read( oneByteBuf, 0, 1 );
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
}
上面調用read(oneByteBuf, 0, 1)讀取一個字節到單字節緩沖區oneByteBuf中,具體實現見如下方法:
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
checkOpen(); // 檢查Client是否正在運行
if (closed) {
throw new IOException("Stream closed");
}
failures = 0;
if (pos < getFileLength()) { // getFileLength()獲取文件所包含的總字節數,pos表示讀取當前文件的第(pos+1)個字節
int retries = 2;
while (retries > 0) {
try {
if (pos > blockEnd) { // blockEnd表示文件的長度(字節數)
currentNode = blockSeekTo(pos); // 找到第pos個字節數據所在的Datanode(實際根據該字節數據所在的block元數據來定位)
}
int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
int result = readBuffer(buf, off, realLen); // 讀取一個字節到緩沖區中
if (result >= 0) {
pos += result; // 每成功讀取result個字節,pos增加result
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
if (stats != null && result != -1) {
stats.incrementBytesRead(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
if (retries == 1) {
LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
}
blockEnd = -1;
if (currentNode != null) { addToDeadNodes(currentNode); }
if (--retries == 0) {
throw e;
}
}
}
}
return -1;
}
讀取文件數據的一個字節,具體過程如下:
- 檢查流對象是否處于打開狀態(前面已經獲取到文件對應的block列表的元數據,并打開一個InputStream對象)
- 從文件的第一個block開始讀取,首先需要找到第一個block對應的數據塊所在的Datanode,可以從緩存的block列表中查詢到(如果查找不到,則會與Namenode進行一次RPC通信請求獲取到)
- 打開一個到該讀取的block所在Datanode節點的流,準備讀取block數據
- 建立了到Datanode的連接后,讀取一個字節數據到字節緩沖區中,返回讀取的字節數(1個字節)
在讀取的過程中,以字節為單位,通過判斷某個偏移位置的字節屬于哪個block(根據block元數據所限定的字節偏移范圍),在根據這個block去定位某一個Datanode節點,這樣就可連續地讀取一個文件的全部數據(組成文件的、連續的多個block數據塊)。
查找待讀取的一個字節所在的Datanode節點
上面public synchronized int read(byte buf[], int off, int len) throws IOException方法,調用了blockSeekTo方法來獲取,文件某個字節索引位置的數據所在的Datanode節點。其實,很容易就能想到,想要獲取到數據所在的Datanode節點,一定是從block元數據中計算得到,然后根據Client緩存的block映射列表,找到block對應的Datanode列表,我們看一下blockSeekTo方法的代碼實現:
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
... ...
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
while (true) {
LocatedBlock targetBlock = getBlockAt(target, true); // 獲取字節偏移位置為target的字節數據所在的block元數據對象
assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();
DNAddrPair retval = chooseDataNode(targetBlock); // 選擇一個Datanode去讀取數據
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
// 先嘗試從本地讀取數據,如果數據不在本地,則正常去讀取遠程的Datanode節點
Block blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
if (shouldTryShortCircuitRead(targetAddr)) {
try {
blockReader = getLocalBlockReader(conf, src, blk, accessToken,
chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); // 創建一個用來讀取本地數據的BlockReader對象
return chosenNode;
} catch (AccessControlException ex) {
LOG.warn("Short circuit access failed ", ex);
//Disable short circuit reads
shortCircuitLocalReads = false;
} catch (IOException ex) {
if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
/* Get a new access token and retry. */
refetchToken--;
fetchBlockAt(target);
continue;
} else {
LOG.info("Failed to read " + targetBlock.getBlock()
+ " on local machine" + StringUtils.stringifyException(ex));
LOG.info("Try reading via the datanode on " + targetAddr);
}
}
}
// 本地讀取失敗,按照更一般的方式去讀取遠程的Datanode節點來獲取數據
try {
s = socketFactory.createSocket();
LOG.debug("Connecting to " + targetAddr);
NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
s.setSoTimeout(socketTimeout);
blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
accessToken,
blk.getGenerationStamp(),
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, clientName); // 創建一個遠程的BlockReader對象
return chosenNode;
} catch (IOException ex) {
if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
refetchToken--;
fetchBlockAt(target);
} else {
LOG.warn("Failed to connect to " + targetAddr
+ ", add to deadNodes and continue" + ex);
if (LOG.isDebugEnabled()) {
LOG.debug("Connection failure", ex);
}
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode); // 讀取失敗,會將選擇的Datanode加入到Client的dead node列表,為下次讀取選擇合適的Datanode讀取文件數據提供參考元數據信息
}
if (s != null) {
try {
s.close();
} catch (IOException iex) { }
}
s = null;
}
}
}
上面代碼中,主要包括如下幾個要點:
- 選擇合適的Datanode節點,提高讀取效率
在讀取文件的時候,首先會從Namenode獲取文件對應的block列表元數據,返回的block列表是按照Datanode的網絡拓撲結構進行排序過的(本地節點優先,其次是同一機架節點),而且,Client還維護了一個dead node列表,只要此時bock對應的Datanode列表中節點不出現在dead node列表中就會被返回,用來作為讀取數據的Datanode節點。
- 如果Client為集群Datanode節點,嘗試從本地讀取block
通過調用chooseDataNode方法返回一個Datanode結點,通過判斷,如果該節點地址是本地地址,并且該節點上對應的block元數據信息的狀態不是正在創建的狀態,則滿足從本地讀取數據塊的條件,然后會創建一個LocalBlockReader對象,直接從本地讀取。在創建LocalBlockReader對象的過程中,會先從緩存中查找一個本地Datanode相關的LocalDatanodeInfo對象,該對象定義了與從本地Datanode讀取數據的重要信息,以及緩存了待讀取block對應的本地路徑信息,可以從LocalDatanodeInfo類定義的屬性來說明:
private ClientDatanodeProtocol proxy = null;
private final Map<Block, BlockLocalPathInfo> cache;
如果緩存中存在待讀取的block的相關信息,可以直接進行讀取;否則,會創建一個proxy對象,以及計算待讀取block的路徑信息BlockLocalPathInfo,最后再加入到緩存,為后續可能的讀取加速。我們看一下如果沒有從緩存中找到LocalDatanodeInfo信息(尤其是BlockLocalPathInfo),則會執行如下邏輯:
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
上面proxy為ClientDatanodeProtocol類型,Client與Datanode進行RPC通信的協議,RPC調用getBlockLocalPathInfo獲取block對應的本地路徑信息,可以在Datanode類中查看具體實現,如下所示:
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
Datanode調用FSDataset(實現接口FSDatasetInterface)的getBlockLocalPathInfo,如下所示:
@Override //FSDatasetInterface
public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
throws IOException {
File datafile = getBlockFile(block); // 獲取本地block在本地Datanode文件系統中的文件路徑
File metafile = getMetaFile(datafile, block); // 獲取本地block在本地Datanode文件系統中的元數據的文件路徑
BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info;
}
接著可以直接去讀取該block文件(如果需要檢查校驗和文件,會讀取block的元數據文件metafile):
... // BlockReaderLocal類的newBlockReader靜態方法
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
dataIn = new FileInputStream(blkfile);
if (!skipChecksum) { // 如果檢查block的校驗和
// get the metadata file
File metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(metafile);
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, checksumIn);
} else {
localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);
}
在上面代碼中,返回了BlockLocalPathInfo,但是很可能在這個過程中block被刪除了,在刪除block的時候,Namenode會調度指派該Datanode刪除該block,恰好在這個時間間隔內block對應的BlockLocalPathInfo信息已經失效(文件已經被刪除),所以上面這段代碼再try中會拋出異常,并在catch中捕獲到IO異常,會從緩存中再清除掉失效的block到BlockLocalPathInfo的映射信息。
- 如果Client非集群Datanode節點,遠程讀取block
如果Client不是Datanode本地節點,則只能跨網絡節點遠程讀取,首先創建Socket連接:
s = socketFactory.createSocket();
LOG.debug("Connecting to " + targetAddr);
NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
s.setSoTimeout(socketTimeout);
建立Client到目標Datanode(targetAddr)的連接,然后同樣也是創建一個遠程BlockReader對象RemoteBlockReader來輔助讀取block數據。創建RemoteBlockReader過程中,首先向目標Datanode發送RPC請求:
// in and out will be closed when sock is closed (by the caller)
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client與Datanode之間傳輸數據的版本號
out.write( DataTransferProtocol.OP_READ_BLOCK ); // 傳輸操作類型:讀取block
out.writeLong( blockId ); // block ID
out.writeLong( genStamp ); // 時間戳信息
out.writeLong( startOffset ); // block起始偏移量
out.writeLong( len ); // block長度
Text.writeString(out, clientName); // 客戶端標識
accessToken.write(out);
out.flush();
然后獲取到DataInputStream對象來讀取Datanode的響應信息:
DataInputStream in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize));
最后,返回一個對象RemoteBlockReader:
return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock);
借助BlockReader來讀取block字節
我們再回到blockSeekTo方法中,待讀取block所在的Datanode信息、BlockReader信息都已經具備,接著就可以從包含輸入流(InputStream)對象的BlockReader中讀取數據塊中一個字節數據:
int result = readBuffer(buf, off, realLen);
將block數據中一個字節讀取到buf中,如下所示:
private synchronized int readBuffer(byte buf[], int off, int len) throws IOException {
IOException ioe;
boolean retryCurrentNode = true;
while (true) {
// retry as many times as seekToNewSource allows.
try {
return blockReader.read(buf, off, len); // 調用blockReader的read方法讀取字節數據到buf中
} catch ( ChecksumException ce ) {
LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode.getName() + " at " + ce.getPos());
reportChecksumFailure(src, currentBlock, currentNode);
ioe = ce;
retryCurrentNode = false; // 只嘗試讀取當前選擇的Datanode一次,失敗的話就會被加入到Client的dead node列表中
} catch ( IOException e ) {
if (!retryCurrentNode) {
LOG.warn("Exception while reading from " + currentBlock + " of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e));
}
ioe = e;
}
boolean sourceFound = false;
if (retryCurrentNode) {
/* possibly retry the same node so that transient errors don't
* result in Application level failures (e.g. Datanode could have
* closed the connection because the client is idle for too long).
*/
sourceFound = seekToBlockSource(pos);
} else {
addToDeadNodes(currentNode); // 加入到Client的dead node列表中
sourceFound = seekToNewSource(pos); // 從當前選擇的Datanode上讀取數據失敗,會再次選擇一個Datanode,這里seekToNewSource方法內部調用了blockSeekTo方法去選擇一個Datanode
}
if (!sourceFound) {
throw ioe;
}
retryCurrentNode = false;
}
}
通過BlockReaderLocal或者RemoteBlockReader來讀取block數據,邏輯非常類似,主要是控制讀取字節的偏移量,記錄偏移量的狀態信息,詳細可以查看它們的源碼。(原創時延軍(包含鏈接:http://shiyanjun.cn))
DataNode節點處理讀文件Block請求
我們可以在DataNode端看一下,如何處理一個讀取Block的請求。如果Client與DataNode不是同一個節點,則為遠程讀取文件Block,首先Client需要發送一個請求頭信息,代碼如下所示:
//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client與Datanode之間傳輸數據的版本號
out.write( DataTransferProtocol.OP_READ_BLOCK ); // 傳輸操作類型:讀取block
out.writeLong( blockId ); // block ID
out.writeLong( genStamp ); // 時間戳信息
out.writeLong( startOffset ); // block起始偏移量
out.writeLong( len ); // block長度
Text.writeString(out, clientName); // 客戶端標識
accessToken.write(out);
out.flush();
DataNode節點端通過驗證數據傳輸版本號(
DataTransferProtocol.DATA_TRANSFER_VERSION)一致以后,會判斷傳輸操作類型,如果是讀操作DataTransferProtocol.OP_READ_BLOCK,則會通過Client建立的Socket來創建一個OutputStream對象,然后通過BlockSender向Client發送Block數據,代碼如下所示:
try {
blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); // 創建BlockSender對象
} catch(IOException e) {
out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
throw e;
}
out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 回復一個響應Header信息:成功狀態
long read = blockSender.sendBlock(out, baseStream, null); // 發送請求的Block數據