NIO的背景
為什么一個已經(jīng)存在10年的增強包還是JAVA的新I/O包呢?原因是對于大多數(shù)的Java程序員而言,基本的I/O操作都能夠勝任。在日常工作中,大部分的Java開發(fā)者沒有必要去學習NIO。更進一步,NIO不僅僅是一個性能提升包。相反,它是一個和Java I/O相關(guān)的不同功能的集合。NIO通過使得Java應用的性能“更加接近實質(zhì)”來達到性能提升的效果,也就是意味著NIO和NIO.2的API暴露了低層次的系統(tǒng)操作的入口。NIO的代價就是它在提供更強大的I/O控制能力的同時,也要求我們比使用基本的I/O編程更加細心地使用和練習。NIO的另一特點是它對于應用程序的表現(xiàn)力的關(guān)注,這個我們會在下面的練習中看到。
Java NIO和IO的主要區(qū)別
- 面向流與面向緩沖. Java NIO和IO之間第一個最大的區(qū)別是,IO是面向流的,NIO是面向緩沖區(qū)的。Java IO面向流意味著每次從流中讀一個或多個字節(jié),直至讀取所有字節(jié),它們沒有被緩存在任何地方。此外,它不能前后移動流中的數(shù)據(jù)。如果需要前后移動從流中讀取的數(shù)據(jù),需要先將它緩存到一個緩沖區(qū)。 Java NIO的緩沖導向方法略有不同。數(shù)據(jù)讀取到一個它稍后處理的緩沖區(qū),需要時可在緩沖區(qū)中前后移動。這就增加了處理過程中的靈活性。
- 阻塞與非阻塞IO Java IO的各種流是阻塞的。這意味著,當一個線程調(diào)用read() 或 write()時,該線程被阻塞,直到有一些數(shù)據(jù)被讀取,或數(shù)據(jù)完全寫入。該線程在此期間不能再干任何事情了。 Java NIO的非阻塞模式,使一個線程從某通道發(fā)送請求讀取數(shù)據(jù),但是它僅能得到目前可用的數(shù)據(jù),如果目前沒有數(shù)據(jù)可用時,該線程可以繼續(xù)做其他的事情。 非阻塞寫也是如此。一個線程請求寫入一些數(shù)據(jù)到某通道,但不需要等待它完全寫入,這個線程同時可以去做別的事情。線程通常將非阻塞IO的空閑時間用于在其它通道上執(zhí)行IO操作,所以一個單獨的線程現(xiàn)在可以管理多個輸入和輸出通道(channel)。
- 選擇器(Selectors) Java NIO的選擇器允許一個單獨的線程來監(jiān)視多個輸入通道,你可以注冊多個通道使用一個選擇器,然后使用一個單獨的線程來“選擇”通道:這些通道里已經(jīng)有可以處理的輸入,或者選擇已準備寫入的通道。這種選擇機制,使得一個單獨的線程很容易來管理多個通道。
最佳practice
SelectionKey.OP_WRITE訂閱時機
現(xiàn)象: cpu占用超高
原因: 訂閱了SelectionKey.OP_WRITE事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isConnectable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); }
分析: 當socket緩沖區(qū)可寫入時就會觸發(fā)OP_WRITE事件. 而socket緩沖區(qū)大多時間都可寫入(網(wǎng)絡不擁堵),由于nio水平觸發(fā)的特性OP_WRITE會一直觸發(fā)導致while()一直空轉(zhuǎn)
水平觸發(fā): 簡單解釋為只要滿足條件就一直觸發(fā),而不是發(fā)生狀態(tài)改變時才觸發(fā)(有點主動和被動觸發(fā)的感覺)
最佳實踐:
方案一: 當有寫數(shù)據(jù)需求時訂閱OP_WRITE事件,數(shù)據(jù)發(fā)送完成取消訂閱.
while (channel.isOpen()) { if (channel.isConnected() && writeBuffer.isReadable()) { //writeBuffer可讀 注冊write事件 channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } //當采用臨時訂閱OP_WRITE方式 必須使用select(ms)進行超時返回 // 因為很有可能當select()前極短時間內(nèi)writeBuffer有數(shù)據(jù),而此時沒有訂閱OP_WRITE事件,會使select()一直阻塞 int ready = selector.select(300); if (ready > 0) { SelectionKey selectionKey = iterator.next(); iterator.remove(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false); if (selectionKey.isWritable()) { writeBuffer.flip(); while (writeBuffer.hasRemaining()) { channel.write(writeBuffer); } writeBuffer.clear(); socketChannel.register(selector, SelectionKey.OP_READ); } } }
當使用臨時訂閱OP_WRITE事件方式時,必須使用selector.select(long),進行超時返回. 因為很有可能當select()前極短時間內(nèi)writeBuffer有數(shù)據(jù),而此時沒有訂閱OP_WRITE事件,會使select()一直阻塞
方案二: 不訂閱OP_WRITE事件,直接通過socketChannel.write()寫數(shù)據(jù).
Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT); channel.connect(new InetSocketAddress("localhost", 5555)); while (channel.isOpen()) { if (channel.isConnected()) { writeBuffer.flip(); while (writeBuffer.hasRemaining()) { channel.write(writeBuffer); } writeBuffer.clear(); } int ready = selector.select(500); ...各種事件處理 }
方案三: 一直訂閱OP_WRITE,socketChannel主動寫
while (channel.isOpen()) { //這里與方案一有區(qū)別 可以直接阻塞 int ready = selector.select(); if (ready > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { ...緩沖區(qū)已寫數(shù)據(jù)清理 SelectionKey selectionKey = iterator.next(); iterator.remove(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false); if (selectionKey.isConnectable()) { if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } //訂閱讀/寫事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } if (selectionKey.isReadable()) { ...讀事件處理 } if (selectionKey.isWritable()) { //改為主動讀取式 ByteBuffer byteBuffer = awaitGetWrite(writeBuffer, 30, 50); if (byteBuffer != null) { int write = channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + write); if (write != byteBuffer.limit()) { System.out.print("a"); } } } } } } /** * 等待獲取寫緩存 * @param byteBuf 緩沖區(qū) * @param ms 緩沖時間 防止空轉(zhuǎn) * @param cap 閾值:超過則直接返回,沒超過等待ms后判斷是否超過閾值 * @return */ public ByteBuffer awaitGetWrite(ByteBuf byteBuf, long ms, int cap) { //緩沖大小 不要過大就行 自己調(diào)整 int socketCap = 1024 * 30; if (byteBuf.readableBytes() >= cap) {//>=cap直接返回 return ByteBuffer.allocate(byteBuf.readableBytes() > socketCap ? socketCap : byteBuf.readableBytes()); } else {//<cap時等待 CountDownLatch countDownLatch = new CountDownLatch(1); try { countDownLatch.await(ms, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } if (byteBuf.readableBytes() > 0) { return ByteBuffer.allocate(byteBuf.readableBytes() > socketCap ? socketCap : byteBuf.readableBytes()); } else { return null; } } }
優(yōu)點缺點方案1當網(wǎng)絡擁堵時,不嘗試寫數(shù)據(jù)需要自己控制訂閱/取消訂閱的時機方案2不關(guān)心網(wǎng)絡擁堵,只要有數(shù)據(jù)就嘗試寫,當網(wǎng)絡擁堵時做大量無用功編寫方便,無需關(guān)心OP_WRITE事件訂閱時機方案3相比方案1 編碼復雜度下降
綜合上述個人覺得還是方案3比較好
channel.write()寫數(shù)據(jù)問題
現(xiàn)象: 網(wǎng)絡擁堵時,cpu占用超高
原因: 網(wǎng)絡擁堵時, channel.write()一直寫不進去,導致while()空轉(zhuǎn)
采取上一問題方案3可以避免該問題
writeBuffer.flip(); while (writeBuffer.hasRemaining()) { channel.write(writeBuffer); } writeBuffer.clear();
分析: 當網(wǎng)絡擁堵時,channel.write()可能寫入0數(shù)據(jù),而這里采用死循環(huán)寫入數(shù)據(jù),假如一直寫不進去就會導致空轉(zhuǎn)
最佳實踐:
while (writeBuffer.isReadable()) { //這里使用的是netty的ByteBuf ByteBuffer byteBuffer = writeBuffer.nioBuffer(); channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position()); int left = byteBuffer.limit() - byteBuffer.position(); if (left != 0) {//無法全部寫入到socket緩沖區(qū)中,說明socket緩沖區(qū)已滿,可能發(fā)生空轉(zhuǎn) break System.err.print("a"); //防止空轉(zhuǎn) 依賴外層循環(huán)重新進入 break; } }
結(jié)合OP_WRITE訂閱時機問題,可以得知方案一的臨時訂閱OP_WRITE事件方式,能更好的防止channel.write(byteBuffer)空轉(zhuǎn)
TCP斷開判斷
現(xiàn)象: 當TCP一方斷開時,另一方cpu占用超高
原因: 當TCP一方斷開時,一直會觸發(fā)OP_READ,導致空轉(zhuǎn).
分析: 當TCP一方斷開時,觸發(fā)OP_READ,socketChannel.read(readBuffer)返回-1,表示對方連接已斷開,自己也需要斷開連接socketChannel.close(),否則會一直觸發(fā)OP_READ,導致空轉(zhuǎn)
while (true) { int ready = selector.select(); if (ready > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isConnectable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false); //The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream int read = socketChannel.read(readBuffer); readBuffer.flip(); //讀到-1 沒有處理 導致空轉(zhuǎn) if (read > 0) { System.out.print(new String(readBuffer.array(), 0, read)); } } ... } } } 復制代碼
最佳實踐:
if (selectionKey.isReadable()) { ByteBuffer readBuffer = Server.SocketContext.get(socketChannel).getReadBuffer(); int read = socketChannel.read(readBuffer); readBuffer.flip(); if (read > 0) { System.out.print(new String(readBuffer.array(), 0, read)); } else if (read == -1) {//對面已斷開 close System.out.println("斷開..." + socketChannel.socket().getRemoteSocketAddress()); socketChannel.close(); } }
ByteBuf使用
ByteBuf,ByteBuffer對比
特性ByteBuffer1.有position,limit屬性,通過flip()切換讀寫模式 ,不支持同時讀/寫 2.定長 3.直接內(nèi)存ByteBuf1.有rix,wix,cap,maxCap屬性,支持同時讀/寫 2.自動擴容 3.直接內(nèi)存,堆內(nèi)存,組合
建議使用ByteBuf
ByteBuf 的clear()和discardReadBytes()對比
現(xiàn)象: 使用clear()導致丟數(shù)據(jù)
原因: clear()實現(xiàn)通過 rix=wix=0,假如此時同時有數(shù)據(jù)寫入,該部分數(shù)據(jù)則丟失
if (selectionKey.isWritable()) { while (writeBuffer.isReadable()) { ByteBuffer byteBuffer = writeBuffer.nioBuffer(); channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position()); int left = byteBuffer.limit() - byteBuffer.position(); if (left != 0) {//無法一次性寫入到緩沖區(qū)中,可能發(fā)生空轉(zhuǎn) break ... break; } else { //清理已發(fā)送數(shù)據(jù) writeBuffer.clear(); } } ... } 復制代碼
最佳實踐:
使用discardReadBytes(),其通過arrayCopy方式并且線程安全,能夠防止數(shù)據(jù)丟失.但頻繁的arrayCopy會有性能問題. 可以使用clear()和discardReadBytes()的組合
if (selectionKey.isWritable()) { while (writeBuffer.isReadable()) { //當緩沖區(qū)使用>2/3事 且wix-rix< (maxCap*1/3) 對緩沖區(qū)進行整理 if (writeBuffer.writerIndex() > (writeBuffer.maxCapacity() / 3 * 2) && writeBuffer.writerIndex() - writeBuffer .readerIndex() < (writeBuffer.maxCapacity() / 3)) { System.out.println(String.format("緩沖區(qū)使用超過2/3 discardReadBytes writerIndex:%d " + "readerIndex:%d", writeBuffer .writerIndex(), writeBuffer.readerIndex())); writeBuffer.discardReadBytes(); } ByteBuffer byteBuffer = writeBuffer.nioBuffer(); channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position()); int left = byteBuffer.limit() - byteBuffer.position(); if (left != 0) {//無法一次性寫入到緩沖區(qū)中,可能發(fā)生空轉(zhuǎn) break ... //防止空轉(zhuǎn) 等待下次write事件 break; } else { //注意clear()的使用 因為writeBuffer一直在寫入 writerIndex可能>readIndex if (writeBuffer.writerIndex() == writeBuffer.readerIndex()) { //TODO 因為不是原子過程 理論上會有問題 但實際驗證中卻沒問題 待驗證 writeBuffer.clear(); System.out.println("clear"); } } } ... }
使用快速收斂
在GunNetty中,快速收斂確保Selector中所有的key均為有效key,不包含失效key,該方法一般使用在關(guān)閉channel之后
@Override public int fastLimit() throws IOException { bootSelector.wakeup(); return bootSelector.select(0); }
1.如果正在阻塞輪訓,立刻終止,使用wakeup函數(shù)
2.立刻select(0)刪除已經(jīng)失效的key