日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

ServerBoostrap

用戶可以通過 netty 的 ServerBoostrap 啟動服務端,時序圖如下:

netty 服務端啟動流程源碼詳解

 

輸入圖片說明

入門例子

為了便于大家理解,我們把服務端啟動的代碼放在下面:

public void run() throws Exception {
    /**
     * EventLoopGroup 是用來處理I/O操作的多線程事件循環器
     * bossGroup: 用來接收進來的連接
     * workerGroup: 用來處理已經被接收的連接
     * 一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。
     */
    EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        /**
         * ServerBootstrap 是一個啟動 NIO 服務的輔助啟動類。
         * 你可以在這個服務中直接使用 Channel,但是這會是一個復雜的處理過程,在很多情況下你并不需要這樣做。
         */
        ServerBootstrap b = new ServerBootstrap(); // (2)
        b.group(bossGroup, workerGroup)
                //指定使用 NIOServerSocketChannel 類來舉例說明一個新的 Channel 如何接收進來的連接。
                .channel(NioServerSocketChannel.class) // (3)
                /**
                 * 這里的事件處理類經常會被用來處理一個最近的已經接收的 Channel。
                 * ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。
                 * 也許你想通過增加一些處理類比如DiscardServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網絡程序。
                 * 當你的程序變的復雜時,可能你會增加更多的處理類到 pipeline 上,然后提取這些匿名類到最頂層的類上。
                 */
                .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new DiscardServerHandler());
                    }
                })
                /**
                 * 你可以設置這里指定的 Channel 實現的配置參數。
                 * 我們正在寫一個TCP/IP 的服務端,因此我們被允許設置 socket 的參數選項比如tcpNoDelay 和 keepAlive。
                 * 請參考 ChannelOption 和詳細的 ChannelConfig 實現的接口文檔以此可以對ChannelOption 的有一個大概的認識。
                 *
                 * option() 是提供給 NioServerSocketChannel 用來接收進來的連接。
                 * childOption() 是提供給由父管道 ServerChannel 接收到的連接,在這個例子中也是 NioServerSocketChannel。
                 */
                .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
        /**
         * 剩下的就是綁定端口然后啟動服務。這里我們在機器上綁定了機器所有網卡上的 8080 端口。
         * 當然現在你可以多次調用 bind() 方法(基于不同綁定地址)。
         */
        // 綁定端口,開始接收進來的連接
        ChannelFuture f = b.bind(port).sync(); // (7)
        System.out.println("DiscardServer start...");
        // 等待服務器  socket 關閉 。
        // 在這個例子中,這不會發生,但你可以優雅地關閉你的服務器。
        f.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

淺析

(1)Builder 構建者模式

為了解決參數較多的問題,這里 netty ServerBootstrap 使用了 builder 模式,可以大大降低我們的配置量,也可以靈活指定配置。

(2)EventLoopGroup 線程池

為了提升性能,線程池是一個自然的選擇。

(3)設置并且綁定 channel

可以發現我們源碼中只有一句話 channel(NioServerSocketChannel.class)

這個方法源碼如下:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

很顯然,這里使用了工廠模式。

我們只需要簡單的指定 class 信息,netty 會自動通過反射創建對應的實現類。

(4)初始化 ChannelPipeline

流水線使用了責任鏈模式,用于處理一系列的 ChannelHandler。

(5)添加 ChannelHandler

netty 這里的 initChannel 方法,可以讓我們非常方便的添加 ChannelHandler。

對個人的影響也比較大,我寫的很多工具方法也會采用類似的模式。

(6)綁定并且啟動監聽端口

我們使用時只有非常優雅的一句話 ChannelFuture f = b.bind(port).sync();,實際上 netty 為我們做了一些封裝:

雖然我們只指定了 port,本質上肯定還是 socket 地址,只不過默認 ip 為本地而已。

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

doBind 的實現還是比較多的,暫時不做展開。

(7)selector 輪訓 & 觸發

寫過 JAVA nio 的小伙伴們肯定知道,需要通過 selector 輪訓獲取消息。

實際上 netty 將這些細節封裝了起來。輪訓準備就緒 Channel 之后,將由 Reactor 線程 NioEventLoop 執行 ChannelPipeline 的響應方法,最終調用到 ChannelHandler。

ChannelHandler 中包含了系統內置的處理類,和用戶自定義的處理類。

netty 服務端啟動流程源碼詳解

 

源碼解析

每一個版本的源碼可能有差異,這里老馬的版本是 4.1.17.Final。

EventLoopGroup

EventLoopGroup 就是 Reactor 線程池。

group 方法如下:

/**
 * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
 * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
 * {@link Channel}'s.
 */
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

這里調用了父類的方法 super.group(parentGroup);,實現如下:

/**
 * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
 * {@link Channel}
 */
public B group(EventLoopGroup group) {
    if (group == null) {
        throw new NullPointerException("group");
    }
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group;
    return self();
}

這個方法主要用于設置 IO 線程,執行和調度網絡事件的讀寫。

channel

channel 的方法如下:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

ReflectiveChannelFactory 反射的核心實現如下:

public ReflectiveChannelFactory(Class<? extends T> clazz) {
    if (clazz == null) {
        throw new NullPointerException("clazz");
    }
    this.clazz = clazz;
}

@Override
public T newChannel() {
    try {
        return clazz.getConstructor().newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + clazz, t);
    }
}

實際上就是通過 NioServerSocketChannel 創建了 Channel 對象。

啟動類設置 Handler

啟動類可以為啟動服務類和父類,分別設置 Handler。

這個也是一開始老馬學習 netty 比較迷惑的地方,這兩個有啥區別呢?

netty 服務端啟動流程源碼詳解

 

輸入圖片說明

本質區別:

(1)ServerBoostrap 中的 Handler 是 NioServerSocketChannel 使用的,所有連接這個監聽端口的客戶端都會執行。

(2)父類 AbstractServerBoostrap 中的 Handler 是一個工廠類,會為每一個接入的客戶端都創建一個新的 Handler。

端口綁定

最后,還有服務端的端口綁定。我們上面只是簡單的過了一下,這里做一下展開:

private ChannelFuture doBind(final SocketAddress localAddress) {
    //1. 創建 channel 并注冊
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    //2. 創建完成后,設置對應的附加屬性
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            //3. 添加監聽器
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

創建 channel 并注冊

initAndRegister() 完整實現如下:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //1. 通過 channelFactory 創建新的 channel
        channel = channelFactory.newChannel();
        //2. 初始化相關屬性
        init(channel);
    } catch (Throwable t) {
        // 省略
    }
    // 省略
    return regFuture;
}

init 是一個抽象方法,服務端實現如下:

@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();

    //1. 設置 Socket 參數和 NioserverSocketChannel 的 附加屬性
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    // 屬性省略
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();

            //2. 將 AbstractBoostrap 的 Handler 添加到 NioserverSocketChannel 的 pipeline 中
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            //3. 將用于服務端注冊的 ServerBootstrapAcceptor 添加到 pipeline 中
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

到這里,服務端的監聽相關資源已經初始化完畢。

接下來,需要把 NioserverSocketChannel 注冊到 Reactor 線程的多路復用選擇器上,然后輪訓客戶端事件。

NioserverSocketChannel 簡介

構造器如下:

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

這個就是默認 channel 初始化的構造器,實際調用的是:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    return provider.openServerSocketChannel();
}

歸根到底,默認的 SelectorProvider 應該是 jdk nio 的 DefaultSelectorProvider。

實際上,還是根據初始化 ServerSocketChannel:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

可以看到,這里默認注冊監聽了 SelectionKey.OP_ACCEPT 事件。

其中 SelectionKey 只有 4 種:

public static final int OP_ACCEPT = 1 << 4;

public static final int OP_CONNECT = 1 << 3;

public static final int OP_WRITE = 1 << 2;

public static final int OP_READ = 1 << 0;

NioserverSocketChannel 注冊

注冊的源碼比較多,看得人云里霧里的。

可以理解,就是首先注冊自己感興趣的事件,發生的時候通知你即可。

注冊的方法如下我們主要看 NioEventLoop 即可,這個類繼承自 SingleThreadEventLoop 類。

實現了 SingleThreadEventExecutor 類的 run 方法,如下:

@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }
            // 省略
            processSelectedKeys();
            // 省略
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        // 省略
    }
}

我們只看核心的部分,這里實際上就是一個死循環,注冊的部分核心如下:

public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
    // 省略
    ch.register(selector, interestOps, task);
    // 省略
}

在 AbstractSelectableChannel 中的實現如下:

public final SelectionKey register(Selector sel, int ops,
                                   Object att)
    throws ClosedChannelException
{
    synchronized (regLock) {
        // 省略
        SelectionKey k = findKey(sel);
        if (k != null) {
            k.interestOps(ops);
            k.attach(att);
        }
        if (k == null) {
            // New registration
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
        }
        return k;
    }
}

這里實際上是注冊感興趣的事件,服務端到這里基本上已經告一段落了。

客戶端接入源碼分析

下面我們看一下 NioEventLoop 是如何處理客戶端請求的。

當多路復用器就緒時,默認執行 processSelectedKeysOptimized() 方法:

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        // 這里處理的 attachment 是 AbstractNioChannel
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

這里實際上是根據不同的的類型,執行不同的操作:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        // 省略
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

我們來重點關注下 read 方法。

NioUnsafe 是一個接口,有兩個子類:NioByteUnsafe 和 NioMessageUnsafe。

NioServerSocketChannel 繼承自 AbstractNioMessageChannel,使用的是 NioMessageUnsafe 類。

read 方法

實現如下:

@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                // 核心方法
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        // 處理讀取的信息
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 觸發 channel read 
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();

        // 觸發 read complete
        pipeline.fireChannelReadComplete();
    } finally {
        // 省略
    }
}

doReadMessages 在 NioServerSocketChannel 類中實現如下:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        // 省略
    }
    return 0;
}

這里就是 jdk nio 中的接收到一個新的客戶端請求的方法實現。

讀取完成之后,觸發 fireChannelRead,如下:

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

如下:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

invokeChannelRead 如下:

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

實際上最后就是一個責任鏈去調用各種 ChannelInboundHandler 類。

到此,客戶端接入完成。

可以進行網絡讀寫等 IO 操作。

分享到:
標簽:netty
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定