日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

簡介

netty為什么快呢?這是因為netty底層使用了JAVA的NIO技術,并在其基礎上進行了性能的優化,雖然netty不是單純的JAVA nio,但是netty的底層還是基于的是nio技術。

nio是JDK1.4中引入的,用于區別于傳統的IO,所以nio也可以稱之為new io。

nio的三大核心是Selector,channel和Buffer,本文我們將會深入探究NIO和netty之間的關系。

NIO常用用法

在講解netty中的NIO實現之前,我們先來回顧一下JDK中NIO的selector,channel是怎么工作的。對于NIO來說selector主要用來接受客戶端的連接,所以一般用在server端。我們以一個NIO的服務器端和客戶端聊天室為例來講解NIO在JDK中是怎么使用的。

因為是一個簡單的聊天室,我們選擇Socket協議為基礎的ServerSocketChannel,首先就是open這個Server channel:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost", 9527));
serverSocketChannel.configureBlocking(false);

然后向server channel中注冊selector:

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

雖然是NIO,但是對于Selector來說,它的select方法是阻塞方法,只有找到匹配的channel之后才會返回,為了多次進行select操作,我們需要在一個while循環里面進行selector的select操作:

while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {
                SelectionKey selectionKey = iter.next();
                if (selectionKey.isAcceptable()) {
                    register(selector, serverSocketChannel);
                }
                if (selectionKey.isReadable()) {
                    serverResponse(byteBuffer, selectionKey);
                }
                iter.remove();
            }
            Thread.sleep(1000);
        }

selector中會有一些SelectionKey,SelectionKey中有一些表示操作狀態的OP Status,根據這個OP Status的不同,selectionKey可以有四種狀態,分別是isReadable,isWritable,isConnectable和isAcceptable。

當SelectionKey處于isAcceptable狀態的時候,表示ServerSocketChannel可以接受連接了,我們需要調用register方法將serverSocketChannel accept生成的socketChannel注冊到selector中,以監聽它的OP READ狀態,后續可以從中讀取數據:

    private static void register(Selector selector, ServerSocketChannel serverSocketChannel)
            throws IOException {
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

當selectionKey處于isReadable狀態的時候,表示可以從socketChannel中讀取數據然后進行處理:

    private static void serverResponse(ByteBuffer byteBuffer, SelectionKey selectionKey)
            throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        socketChannel.read(byteBuffer);
        byteBuffer.flip();
        byte[] bytes= new byte[byteBuffer.limit()];
        byteBuffer.get(bytes);
        log.info(new String(bytes).trim());
        if(new String(bytes).trim().equals(BYE_BYE)){
            log.info("說再見不如不見!");
            socketChannel.write(ByteBuffer.wrap("再見".getBytes()));
            socketChannel.close();
        }else {
            socketChannel.write(ByteBuffer.wrap("你是個好人".getBytes()));
        }
        byteBuffer.clear();
    }

上面的serverResponse方法中,從selectionKey中拿到對應的SocketChannel,然后調用SocketChannel的read方法,將channel中的數據讀取到byteBuffer中,要想回復消息到channel中,還是使用同一個socketChannel,然后調用write方法回寫消息給client端,到這里一個簡單的回寫客戶端消息的server端就完成了。

接下來就是對應的NIO客戶端,在NIO客戶端需要使用SocketChannel,首先建立和服務器的連接:

socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9527));

然后就可以使用這個channel來發送和接受消息了:

    public String sendMessage(String msg) throws IOException {
        byteBuffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        socketChannel.write(byteBuffer);
        byteBuffer.clear();
        socketChannel.read(byteBuffer);
        byteBuffer.flip();
        byte[] bytes= new byte[byteBuffer.limit()];
        byteBuffer.get(bytes);
        response =new String(bytes).trim();
        byteBuffer.clear();
        return response;
    }

向channel中寫入消息可以使用write方法,從channel中讀取消息可以使用read方法。

這樣一個NIO的客戶端就完成了。

雖然以上是NIO的server和client的基本使用,但是基本上涵蓋了NIO的所有要點。接下來我們來詳細了解一下netty中NIO到底是怎么使用的。

NIO和EventLoopGroup

以netty的ServerBootstrap為例,啟動的時候需要指定它的group,先來看一下ServerBootstrap的group方法:

public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    ...
}

ServerBootstrap可以接受一個EventLoopGroup或者兩個EventLoopGroup,EventLoopGroup被用來處理所有的event和IO,對于ServerBootstrap來說,可以有兩個EventLoopGroup,對于Bootstrap來說只有一個EventLoopGroup。兩個EventLoopGroup表示acceptor group和worker group。

EventLoopGroup只是一個接口,我們常用的一個實現就是NioEventLoopGroup,如下所示是一個常用的netty服務器端代碼:

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NIOServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FirstServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 綁定端口并開始接收連接
            ChannelFuture f = b.bind(port).sync();
            // 等待server socket關閉
            f.channel().closeFuture().sync();

這里和NIO相關的有兩個類,分別是NioEventLoopGroup和NioServerSocketChannel,事實上在他們的底層還有兩個類似的類分別叫做NioEventLoop和NioSocketChannel,接下來我們分別講解一些他們的底層實現和邏輯關系。

NioEventLoopGroup

NioEventLoopGroup和DefaultEventLoopGroup一樣都是繼承自MultithreadEventLoopGroup:

public class NioEventLoopGroup extends MultithreadEventLoopGroup 

他們的不同之處在于newChild方法的不同,newChild用來構建Group中的實際對象,NioEventLoopGroup來說,newChild返回的是一個NioEventLoop對象,先來看下NioEventLoopGroup的newChild方法:

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        SelectorProvider selectorProvider = (SelectorProvider) args[0];
        SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
        EventLoopTaskQueueFactory taskQueueFactory = null;
        EventLoopTaskQueueFactory tailTaskQueueFactory = null;

        int argsLength = args.length;
        if (argsLength > 3) {
            taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
        }
        if (argsLength > 4) {
            tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
        }
        return new NioEventLoop(this, executor, selectorProvider,
                selectStrategyFactory.newSelectStrategy(),
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

這個newChild方法除了固定的executor參數之外,還可以根據NioEventLoopGroup的構造函數傳入的參數來實現更多的功能。

這里參數中傳入了SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler、taskQueueFactory和tailTaskQueueFactory這幾個參數,其中后面的兩個EventLoopTaskQueueFactory并不是必須的。

最后所有的參數都會傳遞給NioEventLoop的構造函數用來構造出一個新的NioEventLoop。

在詳細講解NioEventLoop之前,我們來研讀一下傳入的這幾個參數類型的實際作用。

SelectorProvider

SelectorProvider是JDK中的類,它提供了一個靜態的provider()方法可以從Property或者ServiceLoader中加載對應的SelectorProvider類并實例化。

另外還提供了openDatagramChannel、openPipe、openSelector、openServerSocketChannel和openSocketChannel等實用的NIO操作方法。

SelectStrategyFactory

SelectStrategyFactory是一個接口,里面只定義了一個方法,用來返回SelectStrategy:

public interface SelectStrategyFactory {

    SelectStrategy newSelectStrategy();
}

什么是SelectStrategy呢?

先看下SelectStrategy中定義了哪些Strategy:

    int SELECT = -1;

    int CONTINUE = -2;

    int BUSY_WAIT = -3;

SelectStrategy中定義了3個strategy,分別是SELECT、CONTINUE和BUSY_WAIT。

我們知道一般情況下,在NIO中select操作本身是一個阻塞操作,也就是block操作,這個操作對應的strategy是SELECT,也就是select block狀態。

如果我們想跳過這個block,重新進入下一個event loop,那么對應的strategy就是CONTINUE。

BUSY_WAIT是一個特殊的strategy,是指IO 循環輪詢新事件而不阻塞,這個strategy只有在epoll模式下才支持,NIO和Kqueue模式并不支持這個strategy。

RejectedExecutionHandler

RejectedExecutionHandler是netty自己的類,和
java.util.concurrent.RejectedExecutionHandler類似,但是是特別針對SingleThreadEventExecutor來說的。這個接口定義了一個rejected方法,用來表示因為SingleThreadEventExecutor容量限制導致的任務添加失敗而被拒絕的情況:

void rejected(Runnable task, SingleThreadEventExecutor executor);

EventLoopTaskQueueFactory

EventLoopTaskQueueFactory是一個接口,用來創建存儲提交給EventLoop的taskQueue:

Queue<Runnable> newTaskQueue(int maxCapacity);

這個Queue必須是線程安全的,并且繼承自
java.util.concurrent.BlockingQueue.

講解完這幾個參數,接下來我們就可以詳細查看NioEventLoop的具體NIO實現了。

NioEventLoop

首先NioEventLoop和DefaultEventLoop一樣,都是繼承自SingleThreadEventLoop:

public final class NioEventLoop extends SingleThreadEventLoop

表示的是使用單一線程來執行任務的EventLoop。

首先作為一個NIO的實現,必須要有selector,在NioEventLoop中定義了兩個selector,分別是selector和unwrAppedSelector:

    private Selector selector;
    private Selector unwrappedSelector;

在NioEventLoop的構造函數中,他們是這樣定義的:

        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;

首先調用openSelector方法,然后通過返回的SelectorTuple來獲取對應的selector和unwrappedSelector。

這兩個selector有什么區別呢?

在openSelector方法中,首先通過調用provider的openSelector方法返回一個Selector,這個Selector就是unwrappedSelector:

final Selector unwrappedSelector;
unwrappedSelector = provider.openSelector();

然后檢查
DISABLE_KEY_SET_OPTIMIZATION是否設置,如果沒有設置那么unwrappedSelector和selector實際上是同一個Selector:


DISABLE_KEY_SET_OPTIMIZATION表示的是是否對select key set進行優化:

if (DISABLE_KEY_SET_OPTIMIZATION) {
      return new SelectorTuple(unwrappedSelector);
   }

        SelectorTuple(Selector unwrappedSelector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = unwrappedSelector;
        }

如果
DISABLE_KEY_SET_OPTIMIZATION被設置為false,那么意味著我們需要對select key set進行優化,具體是怎么進行優化的呢?

先來看下最后的返回:

return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));

最后返回的SelectorTuple第二個參數就是selector,這里的selector是一個
SelectedSelectionKeySetSelector對象。


SelectedSelectionKeySetSelector繼承自selector,構造函數傳入的第一個參數是一個delegate,所有的Selector中定義的方法都是通過調用delegate來實現的,不同的是對于select方法來說,會首先調用selectedKeySet的reset方法,下面是以isOpen和select方法為例觀察一下代碼的實現:

    public boolean isOpen() {
        return delegate.isOpen();
    }

    public int select(long timeout) throws IOException {
        selectionKeys.reset();
        return delegate.select(timeout);
    }

selectedKeySet是一個SelectedSelectionKeySet對象,是一個set集合,用來存儲SelectionKey,在openSelector()方法中,使用new來實例化這個對象:

final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

netty實際是想用這個SelectedSelectionKeySet類來管理Selector中的selectedKeys,所以接下來netty用了一個高技巧性的對象替換操作。

首先判斷系統中有沒有sun.nio.ch.SelectorImpl的實現:

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

SelectorImpl中有兩個Set字段:

    private Set<SelectionKey> publicKeys;
    private Set<SelectionKey> publicSelectedKeys;

這兩個字段就是我們需要替換的對象。如果有SelectorImpl的話,首先使用Unsafe類,調用PlatformDependent中的objectFieldOffset方法拿到這兩個字段相對于對象示例的偏移量,然后調用putObject將這兩個字段替換成為前面初始化的selectedKeySet對象:

Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
    // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
    // This allows us to also do this in Java9+ without any extra flags.
    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
    long publicSelectedKeysFieldOffset =
            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
        PlatformDependent.putObject(
                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
        PlatformDependent.putObject(
                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
        return null;
    }

如果系統設置不支持Unsafe,那么就用反射再做一次:

 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
 if (cause != null) {
     return cause;
 }
 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
 if (cause != null) {
     return cause;
 }
 selectedKeysField.set(unwrappedSelector, selectedKeySet);
 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);

在NioEventLoop中我們需要關注的一個非常重要的重寫方法就是run方法,在run方法中實現了如何執行task的邏輯。

還記得前面我們提到的selectStrategy嗎?run方法通過調用
selectStrategy.calculateStrategy返回了select的strategy,然后通過判斷strategy的值來進行對應的處理。

如果strategy是CONTINUE,這跳過這次循環,進入到下一個loop中。

BUSY_WAIT在NIO中是不支持的,如果是SELECT狀態,那么會在curDeadlineNanos之后再次進行select操作:

strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
  switch (strategy) {
  case SelectStrategy.CONTINUE:
      continue;
  case SelectStrategy.BUSY_WAIT:
      // fall-through to SELECT since the busy-wait is not supported with NIO
  case SelectStrategy.SELECT:
      long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
      if (curDeadlineNanos == -1L) {
          curDeadlineNanos = NONE; // nothing on the calendar
      }
      nextWakeupNanos.set(curDeadlineNanos);
      try {
          if (!hasTasks()) {
              strategy = select(curDeadlineNanos);
          }
      } finally {
          // This update is just to help block unnecessary selector wakeups
          // so use of lazySet is ok (no race condition)
          nextWakeupNanos.lazySet(AWAKE);
      }
      // fall through
  default:

如果strategy > 0,表示有拿到了SelectedKeys,那么需要調用processSelectedKeys方法對SelectedKeys進行處理:

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

上面提到了NioEventLoop中有兩個selector,還有一個selectedKeys屬性,這個selectedKeys存儲的就是Optimized SelectedKeys,如果這個值不為空,就調用
processSelectedKeysOptimized方法,否則就調用processSelectedKeysPlain方法。


processSelectedKeysOptimized和processSelectedKeysPlain這兩個方法差別不大,只是傳入的要處理的selectedKeys不同。

處理的邏輯是首先拿到selectedKeys的key,然后調用它的attachment方法拿到attach的對象:

final SelectionKey k = selectedKeys.keys[i];
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

如果channel還沒有建立連接,那么這個對象可能是一個NioTask,用來處理channelReady和channelUnregistered的事件。

如果channel已經建立好連接了,那么這個對象可能是一個AbstractNioChannel。

針對兩種不同的對象,會去分別調用不同的processSelectedKey方法。

對第一種情況,會調用task的channelReady方法:

task.channelReady(k.channel(), k);

對第二種情況,會根據SelectionKey的readyOps()的各種狀態調用ch.unsafe()中的各種方法,去進行read或者close等操作。

總結

NioEventLoop雖然也是一個SingleThreadEventLoop,但是通過使用NIO技術,可以更好的利用現有資源實現更好的效率,這也就是為什么我們在項目中使用NioEventLoopGroup而不是DefaultEventLoopGroup的原因。

分享到:
標簽:Netty
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定