ServerBoostrap
用戶可以通過 netty 的 ServerBoostrap 啟動服務端,時序圖如下:
輸入圖片說明
入門例子
為了便于大家理解,我們把服務端啟動的代碼放在下面:
public void run() throws Exception {
/**
* EventLoopGroup 是用來處理I/O操作的多線程事件循環器
* bossGroup: 用來接收進來的連接
* workerGroup: 用來處理已經被接收的連接
* 一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/**
* ServerBootstrap 是一個啟動 NIO 服務的輔助啟動類。
* 你可以在這個服務中直接使用 Channel,但是這會是一個復雜的處理過程,在很多情況下你并不需要這樣做。
*/
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
//指定使用 NIOServerSocketChannel 類來舉例說明一個新的 Channel 如何接收進來的連接。
.channel(NioServerSocketChannel.class) // (3)
/**
* 這里的事件處理類經常會被用來處理一個最近的已經接收的 Channel。
* ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。
* 也許你想通過增加一些處理類比如DiscardServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網絡程序。
* 當你的程序變的復雜時,可能你會增加更多的處理類到 pipeline 上,然后提取這些匿名類到最頂層的類上。
*/
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
/**
* 你可以設置這里指定的 Channel 實現的配置參數。
* 我們正在寫一個TCP/IP 的服務端,因此我們被允許設置 socket 的參數選項比如tcpNoDelay 和 keepAlive。
* 請參考 ChannelOption 和詳細的 ChannelConfig 實現的接口文檔以此可以對ChannelOption 的有一個大概的認識。
*
* option() 是提供給 NioServerSocketChannel 用來接收進來的連接。
* childOption() 是提供給由父管道 ServerChannel 接收到的連接,在這個例子中也是 NioServerSocketChannel。
*/
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
/**
* 剩下的就是綁定端口然后啟動服務。這里我們在機器上綁定了機器所有網卡上的 8080 端口。
* 當然現在你可以多次調用 bind() 方法(基于不同綁定地址)。
*/
// 綁定端口,開始接收進來的連接
ChannelFuture f = b.bind(port).sync(); // (7)
System.out.println("DiscardServer start...");
// 等待服務器 socket 關閉 。
// 在這個例子中,這不會發生,但你可以優雅地關閉你的服務器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
淺析
(1)Builder 構建者模式
為了解決參數較多的問題,這里 netty ServerBootstrap 使用了 builder 模式,可以大大降低我們的配置量,也可以靈活指定配置。
(2)EventLoopGroup 線程池
為了提升性能,線程池是一個自然的選擇。
(3)設置并且綁定 channel
可以發現我們源碼中只有一句話 channel(NioServerSocketChannel.class)
這個方法源碼如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
很顯然,這里使用了工廠模式。
我們只需要簡單的指定 class 信息,netty 會自動通過反射創建對應的實現類。
(4)初始化 ChannelPipeline
流水線使用了責任鏈模式,用于處理一系列的 ChannelHandler。
(5)添加 ChannelHandler
netty 這里的 initChannel 方法,可以讓我們非常方便的添加 ChannelHandler。
對個人的影響也比較大,我寫的很多工具方法也會采用類似的模式。
(6)綁定并且啟動監聽端口
我們使用時只有非常優雅的一句話 ChannelFuture f = b.bind(port).sync();,實際上 netty 為我們做了一些封裝:
雖然我們只指定了 port,本質上肯定還是 socket 地址,只不過默認 ip 為本地而已。
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
doBind 的實現還是比較多的,暫時不做展開。
(7)selector 輪訓 & 觸發
寫過 JAVA nio 的小伙伴們肯定知道,需要通過 selector 輪訓獲取消息。
實際上 netty 將這些細節封裝了起來。輪訓準備就緒 Channel 之后,將由 Reactor 線程 NioEventLoop 執行 ChannelPipeline 的響應方法,最終調用到 ChannelHandler。
ChannelHandler 中包含了系統內置的處理類,和用戶自定義的處理類。
源碼解析
每一個版本的源碼可能有差異,這里老馬的版本是 4.1.17.Final。
EventLoopGroup
EventLoopGroup 就是 Reactor 線程池。
group 方法如下:
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
這里調用了父類的方法 super.group(parentGroup);,實現如下:
/**
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
* {@link Channel}
*/
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
這個方法主要用于設置 IO 線程,執行和調度網絡事件的讀寫。
channel
channel 的方法如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
ReflectiveChannelFactory 反射的核心實現如下:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
實際上就是通過 NioServerSocketChannel 創建了 Channel 對象。
啟動類設置 Handler
啟動類可以為啟動服務類和父類,分別設置 Handler。
這個也是一開始老馬學習 netty 比較迷惑的地方,這兩個有啥區別呢?
輸入圖片說明
本質區別:
(1)ServerBoostrap 中的 Handler 是 NioServerSocketChannel 使用的,所有連接這個監聽端口的客戶端都會執行。
(2)父類 AbstractServerBoostrap 中的 Handler 是一個工廠類,會為每一個接入的客戶端都創建一個新的 Handler。
端口綁定
最后,還有服務端的端口綁定。我們上面只是簡單的過了一下,這里做一下展開:
private ChannelFuture doBind(final SocketAddress localAddress) {
//1. 創建 channel 并注冊
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//2. 創建完成后,設置對應的附加屬性
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
//3. 添加監聽器
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
創建 channel 并注冊
initAndRegister() 完整實現如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//1. 通過 channelFactory 創建新的 channel
channel = channelFactory.newChannel();
//2. 初始化相關屬性
init(channel);
} catch (Throwable t) {
// 省略
}
// 省略
return regFuture;
}
init 是一個抽象方法,服務端實現如下:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
//1. 設置 Socket 參數和 NioserverSocketChannel 的 附加屬性
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 屬性省略
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
//2. 將 AbstractBoostrap 的 Handler 添加到 NioserverSocketChannel 的 pipeline 中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//3. 將用于服務端注冊的 ServerBootstrapAcceptor 添加到 pipeline 中
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
到這里,服務端的監聽相關資源已經初始化完畢。
接下來,需要把 NioserverSocketChannel 注冊到 Reactor 線程的多路復用選擇器上,然后輪訓客戶端事件。
NioserverSocketChannel 簡介
構造器如下:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
這個就是默認 channel 初始化的構造器,實際調用的是:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
return provider.openServerSocketChannel();
}
歸根到底,默認的 SelectorProvider 應該是 jdk nio 的 DefaultSelectorProvider。
實際上,還是根據初始化 ServerSocketChannel:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
可以看到,這里默認注冊監聽了 SelectionKey.OP_ACCEPT 事件。
其中 SelectionKey 只有 4 種:
public static final int OP_ACCEPT = 1 << 4;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_WRITE = 1 << 2;
public static final int OP_READ = 1 << 0;
NioserverSocketChannel 注冊
注冊的源碼比較多,看得人云里霧里的。
可以理解,就是首先注冊自己感興趣的事件,發生的時候通知你即可。
注冊的方法如下我們主要看 NioEventLoop 即可,這個類繼承自 SingleThreadEventLoop 類。
實現了 SingleThreadEventExecutor 類的 run 方法,如下:
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
// 省略
processSelectedKeys();
// 省略
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
// 省略
}
}
我們只看核心的部分,這里實際上就是一個死循環,注冊的部分核心如下:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
// 省略
ch.register(selector, interestOps, task);
// 省略
}
在 AbstractSelectableChannel 中的實現如下:
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
// 省略
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
這里實際上是注冊感興趣的事件,服務端到這里基本上已經告一段落了。
客戶端接入源碼分析
下面我們看一下 NioEventLoop 是如何處理客戶端請求的。
當多路復用器就緒時,默認執行 processSelectedKeysOptimized() 方法:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
// 這里處理的 attachment 是 AbstractNioChannel
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
這里實際上是根據不同的的類型,執行不同的操作:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
我們來重點關注下 read 方法。
NioUnsafe 是一個接口,有兩個子類:NioByteUnsafe 和 NioMessageUnsafe。
NioServerSocketChannel 繼承自 AbstractNioMessageChannel,使用的是 NioMessageUnsafe 類。
read 方法
實現如下:
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 核心方法
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
// 處理讀取的信息
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 觸發 channel read
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 觸發 read complete
pipeline.fireChannelReadComplete();
} finally {
// 省略
}
}
doReadMessages 在 NioServerSocketChannel 類中實現如下:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
// 省略
}
return 0;
}
這里就是 jdk nio 中的接收到一個新的客戶端請求的方法實現。
讀取完成之后,觸發 fireChannelRead,如下:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
如下:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
invokeChannelRead 如下:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
實際上最后就是一個責任鏈去調用各種 ChannelInboundHandler 類。
到此,客戶端接入完成。
可以進行網絡讀寫等 IO 操作。