Netty自己的ByteBuf
ByteBuf是為了解決ByteBuffer的問題和滿足網(wǎng)絡(luò)應(yīng)用程序開發(fā)人員的日常需求而設(shè)計的.
JDK中ByteBuffer的缺點:
- 無法動態(tài)擴容 長度是固定的,不能動態(tài)擴展和收縮,當數(shù)據(jù)大于ByteBuffer容量時,會發(fā)生索引越界異常.
- API使用復(fù)雜 讀寫的時候需要手動調(diào)用flip()和rewind()等方法,使用時需要非常謹慎的使用這些api,否則容易出現(xiàn)錯誤.
ByteBuf做了哪些增強?
- API操作便捷性
- 動態(tài)擴容
- 多種ByteBuf實現(xiàn)
- 內(nèi)存復(fù)用機制
- 零拷貝機制
ByteBuf的操作
三個重要屬性:
- capacity容量
- readerIndex讀取位置
- writerIndex寫入位置
提供了兩個指針變量來支持順序讀和寫操作,分別是readerIndex和writeInDex,也就把緩沖區(qū)分成了三個部分:
0[ --已讀可丟棄區(qū)域-- ]reaerIndex[ --可讀區(qū)域-- ]writerIndex[ --待寫區(qū)域-- ]capacity
常用方法定義:
- 隨機訪問索引getByte
- 順序讀read*
- 順序?qū)憌rite*
- 清除已讀內(nèi)容discardReadBytes
- 清除緩沖區(qū)clear
- 搜索操作
- 標記和重置
- 引用計數(shù)和釋放
我們可以對這些api做一些測試,如下:
package io.netty.example.echo;
import JAVA.util.Arrays;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* @author daniel
* @version 1.0.0
* @date 2021/12/20
*/
public class ApiTest {
public static void main(String[] args) {
//1.創(chuàng)建一個非池化的ByteBuf,大小為10字節(jié)
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始ByteBuf為:" + buf.toString());
System.out.println("1.ByteBuf中的內(nèi)容為:" + Arrays.toString(buf.array()));
System.out.println();
//2.寫入一段內(nèi)容
byte[] bytes = {1,2,3,4,5};
buf.writeBytes(bytes);
System.out.println("寫入的bytes為:" + Arrays.toString(bytes));
System.out.println("寫入一段內(nèi)容后ByteBuf為:" + buf);
System.out.println("2.ByteBuf中的內(nèi)容為:" + Arrays.toString(buf.array()));
System.out.println();
//3.讀取一段內(nèi)容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("讀取的bytes為:" + Arrays.toString(new byte[]{b1, b2}));
System.out.println("讀取一段內(nèi)容后ByteBuf為:" + buf);
System.out.println("3.ByteBuf中的內(nèi)容為:" + Arrays.toString(buf.array()));
System.out.println();
//4.將讀取的內(nèi)容丟棄
buf.discardReadBytes();
System.out.println("丟棄已讀取的內(nèi)容后ByteBuf為:" + buf);
System.out.println("4.ByteBuf中的內(nèi)容為:" + Arrays.toString(buf.array()));
System.out.println();
//5.清空讀寫指針
buf.clear();
System.out.println("清空讀寫指針后ByteBuf為:" + buf);
System.out.println("5.ByteBuf中的內(nèi)容為:" + Arrays.toString(buf.array()));
System.out.println();
//6.再次寫入一段內(nèi)容,比第一段內(nèi)容少
byte[] bytes2 = {1,2,3};
buf.writeBytes(bytes2);
System.out.println("再寫入的bytes2為:" + Arrays.toString(bytes2));
System.out.println("再寫入一段內(nèi)容后ByteBuf為:" + buf);
System.out.println("6.ByteBuf中的內(nèi)容為:" + Arrays.toString(buf.array()));
System.out.println();
//7.將ByteBuf清空
buf.setZero(0, buf.capacity());
System.out.println("內(nèi)容清空后ByteBuf為:" + buf);
System.out.println("7.ByteBuf中的內(nèi)容為:" + Arrays.toString(buf.array()));
System.out.println();
//8.再次寫入一段超過容量的內(nèi)容
byte[] bytes3 = {1,2,3,4,5,6,7,8,9,10,11};
buf.writeBytes(bytes3);
System.out.println("寫入超量的bytes3為:" + Arrays.toString(bytes3));
System.out.println("寫入超量內(nèi)容后ByteBuf為:" + buf);
System.out.println("8.ByteBuf中的內(nèi)容為:" + Arrays.toString(buf.array()));
System.out.println();
}
}
從這些api的使用中就可以體會到ByteBuf比ByteBuffer的強大之處,我們可以深入研究一下它在寫入超量數(shù)據(jù)時的擴容機制,也就是buf.writeBytes(byte[])方法
ByteBuf動態(tài)擴容
容量默認值為256字節(jié),最大值為Integer.MAX_VALUE,也就是2GB
實際調(diào)用
AbstractByteBuf.writeBytes,如下:
AbstractByteBuf.writeBytes
@Override
public ByteBuf writeBytes(byte[] src) {
writeBytes(src, 0, src.length);
return this;
}
AbstractByteBuf.writeBytes(src, 0, src.length);
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length); //檢查是否有足夠的可寫空間,是否需要擴容
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
AbstractByteBuf.ensureWritable(length);
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
ensureWritable0(checkPositiveOrZero(minWritableBytes, "minWritableBytes"));
return this;
}
AbstractByteBuf.ensureWritable0(checkPositiveOrZero(minWritableBytes, "minWritableBytes"));
final void ensureWritable0(int minWritableBytes) {
final int writerIndex = writerIndex(); //獲取當前寫下標
final int targetCapacity = writerIndex + minWritableBytes; //計算最少需要的容量
// using non-short-circuit & to reduce branching - this is a hot path and targetCapacity should rarely overflow
if (targetCapacity >= 0 & targetCapacity <= capacity()) { //判斷當前容量是否夠用
ensureAccessible(); //檢查ByteBuf的引用計數(shù),如果為0則不允許繼續(xù)操作
return;
}
if (checkBounds && (targetCapacity < 0 || targetCapacity > maxCapacity)) { //判斷需要的容量是否是合法值,不合法為true直接拋出越界異常
ensureAccessible();//檢查ByteBuf的引用計數(shù),如果為0則不允許繼續(xù)操作
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the target capacity to the power of 2.(標準化為2的次冪)
final int fastWritable = maxFastWritableBytes();
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity); //計算擴容后容量(只要擴容最小64)
// Adjust to the new capacity.
capacity(newCapacity); //設(shè)置新的容量
}
alloc().calculateNewCapacity(targetCapacity, maxCapacity) -> AbstractByteBufAllocator
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity"); //最小所需容量
if (minNewCapacity > maxCapacity) { //判斷最小所需容量是否合法
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page 閾值超過4M以其他方式計算
if (minNewCapacity == threshold) { //等于4M直接返回4M
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) { //大于4M,不需要加倍,只需要擴大閾值即可
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// 64 <= newCapacity is a power of 2 <= threshold
final int newCapacity = MathUtil.findNextPositivePowerOfTwo(Math.max(minNewCapacity, 64)); //計算不少于所需容量的最小的2次冪的值
return Math.min(newCapacity, maxCapacity); //取容量所允許的最大值和計算的2次冪的最小值,當然在這兒就是newCapacity=64
}
總結(jié)一下就是最小所需容量是否等于閾值,如果是直接返回閾值此后直接擴大閾值,否則以64為最小2次冪為基礎(chǔ)每次擴大二倍直到閾值.
選擇合適的ByteBuf實現(xiàn)
netty針對ByteBuf提供了8中具體的實現(xiàn)方式,如下:
堆內(nèi)/堆外 |
是否池化 |
訪問方式 |
具體實現(xiàn)類 |
備注 |
heap堆內(nèi) |
unpool |
safe |
UnpooledHeapByteBuf |
數(shù)組實現(xiàn) |
heap堆內(nèi) |
unpool |
unsafe |
UnpooledUnsafeHeapByteBuf |
Unsafe類直接操作內(nèi)存 |
heap堆內(nèi) |
pool |
safe |
PooledHeapByteBuf |
|
heap堆內(nèi) |
pool |
unsafe |
PooledUnsafeHeapByteBuf |
~ |
direct堆外 |
unpool |
safe |
UnpooledDirectByteBuf |
NIO DirectByteBuffer |
direct堆外 |
unpool |
unsafe |
UnpooleUnsafedDirectByteBuf |
~ |
direct堆外 |
pool |
safe |
PooledDirectByteBuf |
~ |
direct堆外 |
pool |
unsafe |
PooledUnsafeDirectByteBuf |
~ |
在使用時,都是通過ByteBufAllocator分配器進行申請,同時分配器具有內(nèi)存管理的功能。
在這兒堆內(nèi)和堆外沒有什么區(qū)別,對api的使用時一樣的,僅僅是通過Unpooled申請的不一樣.
那個safe和unsafe有什么區(qū)別呢?
以UnpooledHeapByteBuf和UnpooledUnsafeHeapByteBuf中的getByte(int index)方法為例進行分析
UnpooledHeapByteBuf
@Override
public byte getByte(int index) {
ensureAccessible();
return _getByte(index); //真正的獲取字節(jié)的方法
}
@Override
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index); //通過HeapByteBufUtil工具類獲取數(shù)據(jù)
}
HeapByteBufUtil
static byte getByte(byte[] memory, int index) {
return memory[index];
}
UnpooledHeapByteBuf從堆內(nèi)數(shù)組中獲取數(shù)據(jù),這是安全的
UnpooledUnsafeHeapByteBuf
@Override
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}
PlatformDependent0
static byte getByte(byte[] data, int index) {
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
UnpooledUnsafeHeapByteBuf是通過UNSAFE來操作內(nèi)存的
現(xiàn)在我們來研究一下Unsafe
Unsafe的實現(xiàn)
Unsafe意味著不安全的操作,但是更底層的操作會帶來性能提升和特殊功能,Netty中會盡力使用unsafe以提升系統(tǒng)性能
Java語言很重要的特性就是一次編譯到處運行,所以它針對底層的內(nèi)存或者其他操作做了很多封裝,而unsafe提供了一系列我們操作底層的方法,可能會導(dǎo)致不兼容或不可知的異常.
比如:
- 返回一些低級的內(nèi)存信息
- addressSize
- pageSize
- 提供用于操作對象及其字段的方法
- allocateInstance
- objectFieldOffset
- 提供用于操作類及其靜態(tài)字段的方法
- staticFieldOffset
- defineClass
- defineAnonymousClass
- ensureClassInitialized
- 低級的同步原語
- monitorEnter
- tryMonitorEnter
- monitorExit
- compareAndSwapInt
- putOrderedInt
- 直接訪問內(nèi)存的方法 allocateMomery copyMemory freeMemory getAddress getInt putInt
- 操作數(shù)組
- arrayBaseoffset
- arrayIndexScale
既然這些東西都是jdk封裝好的,而是netty也是直接使用的,所以我們無論在使用safe還是unsafe的時候都是無感知的,我們無需關(guān)系底層的操作邏輯,因為api都是一樣的,只是實現(xiàn)不一樣
是否還有一個疑問,池化和非池化是什么意思?
池化和非池化
比如在使用Unpooled.buffer(10)申請一個緩存區(qū)的時候,默認非池化申請的一個緩沖區(qū).
池化和非池化的區(qū)別主要是申請內(nèi)存緩存空間以及緩存空間的使用上,體現(xiàn)為內(nèi)存復(fù)用.
- 在申請內(nèi)存緩存空間方面:
- pool:池化申請的時候會申請一個比當前所需內(nèi)存空間更大的內(nèi)存空間,這就好比一個快遞柜,為此netty提供了buf分配管理器專門用來處理這種事情,來創(chuàng)建或復(fù)用ByteBuf.
- unpool:非池化申請只會申請?zhí)囟ù笮∧軌蚴褂玫膬?nèi)存緩存空間,使用完之后立刻釋放,這就像直接把快遞放到你的手中,你所在的位置就是開辟的內(nèi)存空間.
- 在緩存空間使用方面:
- pool:池化申請的內(nèi)存空間有一定擴容容積,也就是這個快遞柜可以存放多個快遞,只需要找到對應(yīng)的方格即可存放,同樣buf分配管理器來復(fù)用已經(jīng)創(chuàng)建好的內(nèi)存空間,在創(chuàng)建ByteBuf的時候已經(jīng)開辟3中大小的內(nèi)存塊 normal:16MN small:8KB tiny:512B
- unpool:毫無疑問,非池化的方式必然是每次都會再去開辟內(nèi)存空間的.
理論如此,netty中是如何做到內(nèi)存復(fù)用的?
在netty中每一個EventLoopThread由PooledBytebufAllocator內(nèi)存分配器實力維護了一個線程變量叫做PoolThreadCache,在這個變量中維護了3種規(guī)格的MemoryRegionCache數(shù)組用作內(nèi)存緩存,MemoryRegionCache內(nèi)部是鏈表,隊列里面存Chunk.
首先內(nèi)存內(nèi)存分配器會尋找合適的ByteBuf對象進行復(fù)用;
之后從內(nèi)存數(shù)組中找到合適的內(nèi)存空間進行復(fù)用;
PoolChunk里面維護了內(nèi)存引用,內(nèi)存復(fù)用的做法就是把ByteBuf的memory指向chunk的memory.
如果沒有找到對應(yīng)的緩存空間,則直接向內(nèi)存申請unpool的緩存空間.
netty中默認(池化)也是這樣做的,這也是netty性能高效的一個原因,但是就像example例子一樣,如果我們自己創(chuàng)建的話,netty推薦我們使用unpool.
==需要注意的是即使創(chuàng)建了可復(fù)用的ByteBuf,但是使用過后一直沒有被release,也就是沒有被回收也是不能被復(fù)用的,這是應(yīng)用設(shè)計時應(yīng)該注意的.==
說了半天的廢話,總算是要說到零拷貝機制了
零拷貝機制
Netty的零拷貝機制是一種應(yīng)用層的表現(xiàn),和底層JVM/操作系統(tǒng)內(nèi)存機制并無過多關(guān)聯(lián),你可認為netty就是一個軟件,我們在用這個軟件來創(chuàng)造另一個軟件.
- CompositeByteBuf,將多個ByteBuf合并為一個邏輯上的ByteBuf,避免了各個ByteBuf之間的拷貝 什么意思呢?這是一個虛擬的ByteBuf,這個ByteBuf并不是一個,而是一個復(fù)合緩沖區(qū),有多個獨立的ByteBuf CompositeByteBuf compositeByteBuf = Unpooled.CompositeBuffer(); ByteBuf byteBuf = compositeByteBuf.addComponents(true,buffer1,buffer2); 復(fù)制代碼
- wrapedBuffer()方法,將byte[]數(shù)組包裝成ByteBuf對象 什么意思呢?這也是一個虛擬的ByteBuf,這個新創(chuàng)建的ByteBuf只是通過memory對此字節(jié)數(shù)組做了一個引用,避免了復(fù)制帶來的性能損耗. ByteBuf byteBuf = Unpooled.wrAppedBuffer(new byte[]{1,2,3,4,5}); 復(fù)制代碼
- slice()方法,將一個ByteBuf對象切分成多個ByteBuf對象 什么意思呢?這還是一個虛擬的ByteBuf,只不過拆分出去的ByteBuf中的memory引用的只是拆分出去的字節(jié)位置,并且會以unwarp保留一個對原ByteBuf的引用. ByteBuf byteBuf = Unpooled.wrappedBuffer("hello".getBytes()); ByteBuf newByteBuf = byteBuf.slice(1,2); 復(fù)制代碼
ByteBuf的零拷貝機制也是Netty高性能的一個原因.
作者:梧桐小站
鏈接:
https://juejin.cn/post/7049754668653608997
來源:稀土掘金