簡介
netty為什么快呢?這是因為netty底層使用了JAVA的NIO技術,并在其基礎上進行了性能的優化,雖然netty不是單純的JAVA nio,但是netty的底層還是基于的是nio技術。
nio是JDK1.4中引入的,用于區別于傳統的IO,所以nio也可以稱之為new io。
nio的三大核心是Selector,channel和Buffer,本文我們將會深入探究NIO和netty之間的關系。
NIO常用用法
在講解netty中的NIO實現之前,我們先來回顧一下JDK中NIO的selector,channel是怎么工作的。對于NIO來說selector主要用來接受客戶端的連接,所以一般用在server端。我們以一個NIO的服務器端和客戶端聊天室為例來講解NIO在JDK中是怎么使用的。
因為是一個簡單的聊天室,我們選擇Socket協議為基礎的ServerSocketChannel,首先就是open這個Server channel:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost", 9527));
serverSocketChannel.configureBlocking(false);
然后向server channel中注冊selector:
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
雖然是NIO,但是對于Selector來說,它的select方法是阻塞方法,只有找到匹配的channel之后才會返回,為了多次進行select操作,我們需要在一個while循環里面進行selector的select操作:
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();
if (selectionKey.isAcceptable()) {
register(selector, serverSocketChannel);
}
if (selectionKey.isReadable()) {
serverResponse(byteBuffer, selectionKey);
}
iter.remove();
}
Thread.sleep(1000);
}
selector中會有一些SelectionKey,SelectionKey中有一些表示操作狀態的OP Status,根據這個OP Status的不同,selectionKey可以有四種狀態,分別是isReadable,isWritable,isConnectable和isAcceptable。
當SelectionKey處于isAcceptable狀態的時候,表示ServerSocketChannel可以接受連接了,我們需要調用register方法將serverSocketChannel accept生成的socketChannel注冊到selector中,以監聽它的OP READ狀態,后續可以從中讀取數據:
private static void register(Selector selector, ServerSocketChannel serverSocketChannel)
throws IOException {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
當selectionKey處于isReadable狀態的時候,表示可以從socketChannel中讀取數據然后進行處理:
private static void serverResponse(ByteBuffer byteBuffer, SelectionKey selectionKey)
throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.read(byteBuffer);
byteBuffer.flip();
byte[] bytes= new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
log.info(new String(bytes).trim());
if(new String(bytes).trim().equals(BYE_BYE)){
log.info("說再見不如不見!");
socketChannel.write(ByteBuffer.wrap("再見".getBytes()));
socketChannel.close();
}else {
socketChannel.write(ByteBuffer.wrap("你是個好人".getBytes()));
}
byteBuffer.clear();
}
上面的serverResponse方法中,從selectionKey中拿到對應的SocketChannel,然后調用SocketChannel的read方法,將channel中的數據讀取到byteBuffer中,要想回復消息到channel中,還是使用同一個socketChannel,然后調用write方法回寫消息給client端,到這里一個簡單的回寫客戶端消息的server端就完成了。
接下來就是對應的NIO客戶端,在NIO客戶端需要使用SocketChannel,首先建立和服務器的連接:
socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9527));
然后就可以使用這個channel來發送和接受消息了:
public String sendMessage(String msg) throws IOException {
byteBuffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
socketChannel.write(byteBuffer);
byteBuffer.clear();
socketChannel.read(byteBuffer);
byteBuffer.flip();
byte[] bytes= new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
response =new String(bytes).trim();
byteBuffer.clear();
return response;
}
向channel中寫入消息可以使用write方法,從channel中讀取消息可以使用read方法。
這樣一個NIO的客戶端就完成了。
雖然以上是NIO的server和client的基本使用,但是基本上涵蓋了NIO的所有要點。接下來我們來詳細了解一下netty中NIO到底是怎么使用的。
NIO和EventLoopGroup
以netty的ServerBootstrap為例,啟動的時候需要指定它的group,先來看一下ServerBootstrap的group方法:
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
...
}
ServerBootstrap可以接受一個EventLoopGroup或者兩個EventLoopGroup,EventLoopGroup被用來處理所有的event和IO,對于ServerBootstrap來說,可以有兩個EventLoopGroup,對于Bootstrap來說只有一個EventLoopGroup。兩個EventLoopGroup表示acceptor group和worker group。
EventLoopGroup只是一個接口,我們常用的一個實現就是NioEventLoopGroup,如下所示是一個常用的netty服務器端代碼:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NIOServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FirstServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口并開始接收連接
ChannelFuture f = b.bind(port).sync();
// 等待server socket關閉
f.channel().closeFuture().sync();
這里和NIO相關的有兩個類,分別是NioEventLoopGroup和NioServerSocketChannel,事實上在他們的底層還有兩個類似的類分別叫做NioEventLoop和NioSocketChannel,接下來我們分別講解一些他們的底層實現和邏輯關系。
NioEventLoopGroup
NioEventLoopGroup和DefaultEventLoopGroup一樣都是繼承自MultithreadEventLoopGroup:
public class NioEventLoopGroup extends MultithreadEventLoopGroup
他們的不同之處在于newChild方法的不同,newChild用來構建Group中的實際對象,NioEventLoopGroup來說,newChild返回的是一個NioEventLoop對象,先來看下NioEventLoopGroup的newChild方法:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
這個newChild方法除了固定的executor參數之外,還可以根據NioEventLoopGroup的構造函數傳入的參數來實現更多的功能。
這里參數中傳入了SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler、taskQueueFactory和tailTaskQueueFactory這幾個參數,其中后面的兩個EventLoopTaskQueueFactory并不是必須的。
最后所有的參數都會傳遞給NioEventLoop的構造函數用來構造出一個新的NioEventLoop。
在詳細講解NioEventLoop之前,我們來研讀一下傳入的這幾個參數類型的實際作用。
SelectorProvider
SelectorProvider是JDK中的類,它提供了一個靜態的provider()方法可以從Property或者ServiceLoader中加載對應的SelectorProvider類并實例化。
另外還提供了openDatagramChannel、openPipe、openSelector、openServerSocketChannel和openSocketChannel等實用的NIO操作方法。
SelectStrategyFactory
SelectStrategyFactory是一個接口,里面只定義了一個方法,用來返回SelectStrategy:
public interface SelectStrategyFactory {
SelectStrategy newSelectStrategy();
}
什么是SelectStrategy呢?
先看下SelectStrategy中定義了哪些Strategy:
int SELECT = -1;
int CONTINUE = -2;
int BUSY_WAIT = -3;
SelectStrategy中定義了3個strategy,分別是SELECT、CONTINUE和BUSY_WAIT。
我們知道一般情況下,在NIO中select操作本身是一個阻塞操作,也就是block操作,這個操作對應的strategy是SELECT,也就是select block狀態。
如果我們想跳過這個block,重新進入下一個event loop,那么對應的strategy就是CONTINUE。
BUSY_WAIT是一個特殊的strategy,是指IO 循環輪詢新事件而不阻塞,這個strategy只有在epoll模式下才支持,NIO和Kqueue模式并不支持這個strategy。
RejectedExecutionHandler
RejectedExecutionHandler是netty自己的類,和
java.util.concurrent.RejectedExecutionHandler類似,但是是特別針對SingleThreadEventExecutor來說的。這個接口定義了一個rejected方法,用來表示因為SingleThreadEventExecutor容量限制導致的任務添加失敗而被拒絕的情況:
void rejected(Runnable task, SingleThreadEventExecutor executor);
EventLoopTaskQueueFactory
EventLoopTaskQueueFactory是一個接口,用來創建存儲提交給EventLoop的taskQueue:
Queue<Runnable> newTaskQueue(int maxCapacity);
這個Queue必須是線程安全的,并且繼承自
java.util.concurrent.BlockingQueue.
講解完這幾個參數,接下來我們就可以詳細查看NioEventLoop的具體NIO實現了。
NioEventLoop
首先NioEventLoop和DefaultEventLoop一樣,都是繼承自SingleThreadEventLoop:
public final class NioEventLoop extends SingleThreadEventLoop
表示的是使用單一線程來執行任務的EventLoop。
首先作為一個NIO的實現,必須要有selector,在NioEventLoop中定義了兩個selector,分別是selector和unwrAppedSelector:
private Selector selector;
private Selector unwrappedSelector;
在NioEventLoop的構造函數中,他們是這樣定義的:
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
首先調用openSelector方法,然后通過返回的SelectorTuple來獲取對應的selector和unwrappedSelector。
這兩個selector有什么區別呢?
在openSelector方法中,首先通過調用provider的openSelector方法返回一個Selector,這個Selector就是unwrappedSelector:
final Selector unwrappedSelector;
unwrappedSelector = provider.openSelector();
然后檢查
DISABLE_KEY_SET_OPTIMIZATION是否設置,如果沒有設置那么unwrappedSelector和selector實際上是同一個Selector:
DISABLE_KEY_SET_OPTIMIZATION表示的是是否對select key set進行優化:
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
如果
DISABLE_KEY_SET_OPTIMIZATION被設置為false,那么意味著我們需要對select key set進行優化,具體是怎么進行優化的呢?
先來看下最后的返回:
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
最后返回的SelectorTuple第二個參數就是selector,這里的selector是一個
SelectedSelectionKeySetSelector對象。
SelectedSelectionKeySetSelector繼承自selector,構造函數傳入的第一個參數是一個delegate,所有的Selector中定義的方法都是通過調用delegate來實現的,不同的是對于select方法來說,會首先調用selectedKeySet的reset方法,下面是以isOpen和select方法為例觀察一下代碼的實現:
public boolean isOpen() {
return delegate.isOpen();
}
public int select(long timeout) throws IOException {
selectionKeys.reset();
return delegate.select(timeout);
}
selectedKeySet是一個SelectedSelectionKeySet對象,是一個set集合,用來存儲SelectionKey,在openSelector()方法中,使用new來實例化這個對象:
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
netty實際是想用這個SelectedSelectionKeySet類來管理Selector中的selectedKeys,所以接下來netty用了一個高技巧性的對象替換操作。
首先判斷系統中有沒有sun.nio.ch.SelectorImpl的實現:
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
SelectorImpl中有兩個Set字段:
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
這兩個字段就是我們需要替換的對象。如果有SelectorImpl的話,首先使用Unsafe類,調用PlatformDependent中的objectFieldOffset方法拿到這兩個字段相對于對象示例的偏移量,然后調用putObject將這兩個字段替換成為前面初始化的selectedKeySet對象:
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
如果系統設置不支持Unsafe,那么就用反射再做一次:
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
在NioEventLoop中我們需要關注的一個非常重要的重寫方法就是run方法,在run方法中實現了如何執行task的邏輯。
還記得前面我們提到的selectStrategy嗎?run方法通過調用
selectStrategy.calculateStrategy返回了select的strategy,然后通過判斷strategy的值來進行對應的處理。
如果strategy是CONTINUE,這跳過這次循環,進入到下一個loop中。
BUSY_WAIT在NIO中是不支持的,如果是SELECT狀態,那么會在curDeadlineNanos之后再次進行select操作:
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
如果strategy > 0,表示有拿到了SelectedKeys,那么需要調用processSelectedKeys方法對SelectedKeys進行處理:
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
上面提到了NioEventLoop中有兩個selector,還有一個selectedKeys屬性,這個selectedKeys存儲的就是Optimized SelectedKeys,如果這個值不為空,就調用
processSelectedKeysOptimized方法,否則就調用processSelectedKeysPlain方法。
processSelectedKeysOptimized和processSelectedKeysPlain這兩個方法差別不大,只是傳入的要處理的selectedKeys不同。
處理的邏輯是首先拿到selectedKeys的key,然后調用它的attachment方法拿到attach的對象:
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
如果channel還沒有建立連接,那么這個對象可能是一個NioTask,用來處理channelReady和channelUnregistered的事件。
如果channel已經建立好連接了,那么這個對象可能是一個AbstractNioChannel。
針對兩種不同的對象,會去分別調用不同的processSelectedKey方法。
對第一種情況,會調用task的channelReady方法:
task.channelReady(k.channel(), k);
對第二種情況,會根據SelectionKey的readyOps()的各種狀態調用ch.unsafe()中的各種方法,去進行read或者close等操作。
總結
NioEventLoop雖然也是一個SingleThreadEventLoop,但是通過使用NIO技術,可以更好的利用現有資源實現更好的效率,這也就是為什么我們在項目中使用NioEventLoopGroup而不是DefaultEventLoopGroup的原因。