日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

編者注:Netty是JAVA領域有名的開源網絡庫,特點是高性能和高擴展性,因此很多流行的框架都是基于它來構建的,比如我們熟知的Dubbo、Rocketmq、Hadoop等,針對高性能RPC,一般都是基于Netty來構建,比如soft-bolt??傊痪湓?,Java小伙伴們需要且有必要學會使用Netty并理解其實現原理。
關于Netty的入門講解可參考:Netty 入門,這一篇文章就夠了

Netty的啟動流程(ServerBootstrap),就是創建NioEventLoopGroup(內部可能包含多個NioEventLoop,每個eventLoop是一個線程,內部包含一個FIFO的taskQueue和Selector)和ServerBootstrap實例,并進行bind的過程(bind流程涉及到channel的創建和注冊),之后就可以對外提供服務了。

Netty的啟動流程中,涉及到多個操作,比如register、bind、注冊對應事件等,為了不影響main線程執行,這些工作以task的形式提交給NioEventLoop,由NioEventLoop來執行這些task,也就是register、bind、注冊事件等操作。

NioEventLoop(準確來說是SingleThreadEventExecutor)中包含了private volatile Thread thread,該thread變量的初始化是在new的線程第一次執行run方式時才賦值的,這種形式挺新穎的。

Netty啟動流程剖析

 

Netty啟動流程圖如下所示:

Netty啟動流程剖析

 

大致了解了Netty啟動流程之后,下面就按照Netty啟動流程中涉及到的源碼來進行分析。

netty啟動流程分為server端和client端,不同之處就是前者監聽端口,對外提供服務(socket->bind->listen操作),對應類ServerBootstrap;后者主動去連接遠端端口(socket->connect),對應類Bootstrap。

server端啟動流程

server端啟動流程可以理解成創建ServerBootstrap實例的過程,就以下面代碼為例進行分析(echo服務):

public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // bossGroup處理connect事件 // workerGroup處理read/write事件 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NIOServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() {
 @Override
 public void initChannel(SocketChannel ch) throws Exception {
 // 當連接建立后(register到childWorkerGroup前)初始化channel.pipeline
 ch.pipeline().addLast(serverHandler);
 }
 });

 // Start the server.
 ChannelFuture f = b.bind(PORT).sync();
 // Wait until the server socket is closed.
 f.channel().closeFuture().sync();
 } finally {
 // Shut down all event loops to terminate all threads.
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
}

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 ctx.write(msg);
 }

 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) {
 ctx.flush();
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 // Close the connection when an exception is raised.
 cause.printStackTrace();
 ctx.close();
 }
}

EventLoopGroup創建

EventLoopGroup中可能包含了多個EventLoop,EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理客戶端請求和內部任務,內部任務如ServerSocketChannel注冊和ServerSocket綁定操作等。關于NioEventLoop,后續專門寫一篇文章分析,這里就不再展開,只需知道個大概即可,其架構圖如下:

Netty啟動流程剖析

 

EventLoopGroup創建本質就是創建多個NioEventLoop,這里創建NioEventLoop就是初始化一個Reactor,包括selector和taskQueue。主要邏輯如下:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // 創建NioEventLoop實例 children = new EventExecutor[nThreads]; // 初始化NioEventLoop,實際調用的是NioEventLoopGroup.newChild方法 for (int i = 0; i < nThreads; i ++) {
 children[i] = newChild(executor, args);
 }

 // 多個NioEventLoop中選擇策略
 chooser = chooserFactory.newChooser(children);
}

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
 // 創建taskQueue
 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
 // 是不是很熟悉,java nio selector操作
 provider = selectorProvider;
 final SelectorTuple selectorTuple = openSelector();
 selector = selectorTuple.selector;
 unwrAppedSelector = selectorTuple.unwrappedSelector;
 selectStrategy = strategy;
}

EventLoopGroup創建OK后,啟動的第一步就算完成了,接下來該進行bind、listen操作了。

ServerBootstrap流程

bind操作

bind操作是ServerBootstrap流程重要的一環,bind流程涉及到NioChannel的創建、初始化和注冊(到Selector),啟動NioEventLoop,之后就可以對外提供服務了。

public ChannelFuture bind(SocketAddress localAddress) {
 validate(); // 參數校驗
 return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
 // 1. 初始化注冊操作
 final ChannelFuture regFuture = initAndRegister();
 final Channel channel = regFuture.channel();
 if (regFuture.cause() != null) {
 return regFuture;
 }
 
 // 2. doBind0操作
 if (regFuture.isDone()) {
 // register已完成,這里直接調用doBind0
 ChannelPromise promise = channel.newPromise();
 doBind0(regFuture, channel, localAddress, promise);
 return promise;
 } else {
 // register還未完成,注冊listener回調,在回調中調用doBind0
 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
 regFuture.addListener(new ChannelFutureListener() {
 /**
 * channel register完成(注冊到Selector并且調用了invokeHandlerAddedIfNeeded)之后,
 * 會調用safeSetSuccess,觸發各個ChannelFutureListener,最終會調用到這里的operationComplete方法
 */
 @Override
 public void operationComplete(ChannelFuture future) throws Exception {
 Throwable cause = future.cause();
 if (cause != null) {
 promise.setFailure(cause);
 } else {
 promise.registered();
 doBind0(regFuture, channel, localAddress, promise);
 }
 }
 });
 return promise;
 }
}

這里涉及到2個操作,一個是channel的創建、初始化、注冊操作,另一個是bind操作,下面兵分兩路,分別來講。

注意,這里如果main線程執行到regFuture.isDone()時,register還未完成,那么main線程是不會直接調用bind操作的,而是往regFuture上注冊一個Listenner,這樣channel register完成(注冊到Selector并且調用了invokeHandlerAddedIfNeeded)之后,會調用safeSetSuccess,觸發各個ChannelFutureListener,最終會調用到這里的operationComplete方法,進而在執行bind操作。

channel初始化、注冊操作

final ChannelFuture initAndRegister() {
 Channel channel = null;
 try {
 // 1.創建(netty自定義)Channel實例,并初始化
 // channel為 NioServerSocketChannel 實例,NioServerSocketChannel的父類AbstractNioChannel保存有nio的ServerSocketChannel
 channel = channelFactory.newChannel();
 // 2.初始化channel()
 init(channel);
 } catch (Throwable t) {
 }
 
 // 3.向Selector注冊channel
 ChannelFuture regFuture = config().group().register(channel);
 if (regFuture.cause() != null) {
 if (channel.isRegistered()) {
 channel.close();
 } else {
 channel.unsafe().closeForcibly();
 }
 }
 
 return regFuture;
}

這里重點關注下初始化channel流程,主要操作是設置channel屬性、設置channel.pipeline的ChannelInitializer,注意,ChannelInitializer是在channel注冊到selector之后被回調的。

/** * 初始channel屬性,也就是ChannelOption對應socket的各種屬性。 * 比如 SO_KEEPALIVE SO_RCVBUF ... 可以與linux中的setsockopt函數對應起來。 * 最后將ServerBootstrapAcceptor添加到對應channel的ChannelPipeline中。 */@Overridevoid init(Channel channel) throws Exception { final Map, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); }  ChannelPipeline p = channel.pipeline(); // 獲取childGroup和childHandler,傳遞給ServerBootstrapAcceptor final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); }  p.addLast(new ChannelInitializer() {
 /**
 * 在register0中,將channel注冊到Selector之后,會調用invokeHandlerAddedIfNeeded,
 * 進而調用到這里的initChannel方法
 */
 @Override
 public void initChannel(final Channel ch) throws Exception {
 final ChannelPipeline pipeline = ch.pipeline();
 ChannelHandler handler = config.handler();
 if (handler != null) {
 pipeline.addLast(handler);
 }
 
 // 這里注冊一個添加ServerBootstrapAcceptor的任務
 ch.eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 // 添加ServerBootstrapAcceptor
 pipeline.addLast(new ServerBootstrapAcceptor(
 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
 }
 });
 }
 });
}

channel初始化之后就該將其注冊到selector,即下面的register流程:

public ChannelFuture register(Channel channel) {
 // next()挑選一個EventLoop,默認輪詢選擇某個NioEventLoop
 return next().register(channel);
}
public ChannelFuture register(final ChannelPromise promise) {
 promise.channel().unsafe().register(this, promise);
 return promise;
}
// AbstractChannel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 AbstractChannel.this.eventLoop = eventLoop;
 
 // 直接執行register0或者以任務方式提交執行
 // 啟動時,首先執行到這里的是main線程,所以是以任務的方式來提交執行的。
 // 也就是說,該任務是NioEventLoop第一次執行的任務,即調用register0
 if (eventLoop.inEventLoop()) {
 register0(promise);
 } else {
 // 往NioEventLoop中(任務隊列)添加任務時,如果NioEventLoop線程還未啟動,則啟動該線程
 eventLoop.execute(new Runnable() {
 @Override
 public void run() {
 register0(promise);
 }
 });
 }
}

register操作

register操作之后伴隨著多個回調及listener的觸發:

// AbstractChannel$AbstractUnsafe
private void register0(ChannelPromise promise) {
 boolean firstRegistration = neverRegistered;
 // 這里調用的是AbstractNioChannel.doRegister
 // 這里將channel注冊上去,并沒有關注對應的事件(read/write事件)
 doRegister();
 neverRegistered = false;
 registered = true;
 
 // 調用handlerAdd事件,這里就會調用initChannel方法,設置channel.pipeline,也就是添加 ServerBootstrapAcceptor
 pipeline.invokeHandlerAddedIfNeeded();
 
 // 調用operationComplete回調
 safeSetSuccess(promise);
 // 回調fireChannelRegistered
 pipeline.fireChannelRegistered();
 // Only fire a channelActive if the channel has never been registered. This prevents firing
 // multiple channel actives if the channel is deregistered and re-registered.
 if (isActive()) {
 if (firstRegistration) {
 // 回調fireChannelActive
 pipeline.fireChannelActive();
 } else if (config().isAutoRead()) {
 beginRead();
 }
 }
}

上面代碼中的initChannel回調也就是設置對外監聽channel的channelHanlder為ServerBootstrapAcceptor;operationComplete回調也就是觸發ChannelFutureListener.operationComplete,這里會進行后續的doBind操作。

// AbstractBootstrap
private static void doBind0(
 final ChannelFuture regFuture, final Channel channel,
 final SocketAddress localAddress, final ChannelPromise promise) {
 // doBind0向EventLoop任務隊列中添加一個bind任務來完成后續操作。
 channel.eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 if (regFuture.isSuccess()) {
 // bind操作
 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
 }
 }
 });
}

bind操作

在回顧上面的bind操作代碼,bind操作是在register之后進行的,因為register0是由NioEventLoop執行的,所以main線程需要先判斷下future是否完成,如果完成直接進行doBind即可,否則添加listener回調進行doBind。

Netty啟動流程剖析

 

bind操作及后續初始化操作(channelActive回調、設置監聽事件)

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
 boolean wasActive = isActive();
 try {
 // 調用底層bind操作
 doBind(localAddress);
 } catch (Throwable t) {
 safeSetFailure(promise, t);
 closeIfClosed();
 return;
 }

 if (!wasActive && isActive()) {
 invokeLater(new Runnable() {
 @Override
 public void run() {
 pipeline.fireChannelActive();
 }
 });
 }
 safeSetSuccess(promise);
}

// 最后底層bind邏輯bind入參包括了backlog,也就是底層會進行listen操作
// DefaultChannelPipeline.headContext -> NioMessageUnsafe -> NioServerSocketChannel
protected void doBind(SocketAddress localAddress) throws Exception {
 if (PlatformDependent.javaVersion() >= 7) {
 javaChannel().bind(localAddress, config.getBacklog());
 } else {
 javaChannel().socket().bind(localAddress, config.getBacklog());
 }
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
 // 回調fireChannelActive
 ctx.fireChannelActive();
 
 // 設置selectKey監聽事件,對于監聽端口就是SelectionKey.OP_ACCEPT,對于新建連接就是SelectionKey.OP_READ
 readIfIsAutoRead();
}

到這里為止整個netty啟動流程就基本接近尾聲,可以對外提供服務了。

分享到:
標簽:Netty
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定