一、目錄介紹
- 前置知識(shí)點(diǎn)
- NIO
- Channel
- Callback
- Future 和 Promise
- 事件和 ChannelHandler
二、前置知識(shí)點(diǎn)
1、NIO
首先我們需要回顧一下,同步、異步、阻塞、非阻塞的相關(guān)概念。
- 同步:調(diào)用 API 后,調(diào)用者能“馬上”就知道操作的結(jié)果。
- 異步:相對(duì)于同步,調(diào)用 API 后,調(diào)用者不能“馬上”知道操作的結(jié)果,要等被調(diào)用方 回調(diào) 通知結(jié)果。
- 阻塞:等待全部數(shù)據(jù)讀取(寫入)完成后,才返回。
- 非阻塞:讀取時(shí),讀多少返回多少;寫入時(shí),寫入多少返回多少。不用等待,全部數(shù)據(jù)完成操作后,才返回。
NIO 是一種 同步非阻塞 的 I/O模型。
- 同步是指線程不斷輪詢 I/O 事件是否就緒。
- 非阻塞是指線程在等待 I/O 的時(shí)候,可以同時(shí)做其他任務(wù)。
同步的核心是 選擇器,選擇器代替了線程本身輪詢 I/O 事件,避免了阻塞同時(shí)減少了不必要的線程消耗;非阻塞的核心就是 通道和緩沖區(qū),當(dāng) I/O 事件就緒時(shí),可以通過寫到緩沖區(qū),保證 I/O 的成功,而無需線程阻塞式地等待。
NIO主要有三大核心部分:
- Channel(通道)
- Buffer(緩沖區(qū))
- Selector(選擇器)
傳統(tǒng) I/O 基于 字節(jié)流和字符流 進(jìn)行操作,而 NIO 基于 Channel 和 Buffer 進(jìn)行操作,數(shù)據(jù)總是從通道讀取到緩沖區(qū)中,或者從緩沖區(qū)寫入到通道中。Selector 用于監(jiān)聽多個(gè)通道的事件(連接打開,數(shù)據(jù)到達(dá)等)。因此,單個(gè)線程可以監(jiān)聽多個(gè)數(shù)據(jù)通道,如下圖所示:
NIO
三、Netty 的核心組件
1、Channel
Channel 是一個(gè)通道,用于連接字節(jié)緩沖區(qū) Buffer 和另一端的實(shí)體。在 NIO 網(wǎng)絡(luò)編程模型中,服務(wù)端和客戶端進(jìn)行 I/O 數(shù)據(jù)交互(得到彼此推送的信息)的媒介就是 Channel。
Netty 對(duì) JDK 原生的 ServerSocketChannel 進(jìn)行了封裝和增強(qiáng)。
Netty的Channel增加了如下的組件:
- id 標(biāo)識(shí)唯一身份信息
- 可能存在的 parent Channel
- 管道 pepiline
- 用于數(shù)據(jù)讀寫的 unsafe 內(nèi)部類
- 事件循環(huán)執(zhí)行器 NioEventLoop
Channel可以分成兩類:
- 服務(wù)端: NIOServerSocketChannel
- 客戶端: NioSocketChannel
具體依賴關(guān)系如下圖所示:
服務(wù)端: NioServerSocketChannel
NioServerSocketChannel
客戶端: NioSocketChannel
NioSocketChannel
2、Callback
callback 就是回調(diào),一個(gè)方法可以在適當(dāng)?shù)臅r(shí)候回過頭來調(diào)用這個(gè) callback 方法。callback 是用于通知相關(guān)方某個(gè)操作已經(jīng)完成最常用的方法之一。
Netty 在處理事件時(shí)內(nèi)部使用了 callback。當(dāng)一個(gè) callback 被觸發(fā),事件可以被 ChannelHandler 的接口實(shí)現(xiàn)處理。
一個(gè)簡(jiǎn)單的例子如下所示:
public class ConnectHandler extends ChannelInboundHandlerAdapter {
// 當(dāng)一個(gè)新的連接建立時(shí),channelActive 被調(diào)用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress());
}
}
當(dāng)一個(gè)新的連接建立后,ChannelHandler 的 callback 方法 channelActive() 會(huì)被調(diào)用,然后打印一條消息。
這個(gè) ConnectHandler 實(shí)例(相當(dāng)于被調(diào)用者)以參數(shù)的形式傳入創(chuàng)建 Channel 連接的函數(shù)(調(diào)用者)中,之后這個(gè)函數(shù)創(chuàng)建新連接后,就會(huì)回來調(diào)用這個(gè) ConnectHandler 的 channelActive 方法,這個(gè)過程就叫回調(diào)。
3、Future 和 Promise
Future 和 Promise 起源于函數(shù)式編程,目的是將值(Future)與其計(jì)算方式(Promise)分離,從而允許更靈活地進(jìn)行計(jì)算,特別是通過并行化。
Future 表示目標(biāo)計(jì)算的返回值,Promise 表示計(jì)算的方式,這個(gè)模型將返回結(jié)果和計(jì)算邏輯分離,目的是為了讓計(jì)算邏輯不影響返回結(jié)果,從而抽象出一套異步編程模型。它們之間的紐帶就是 Callback。
簡(jiǎn)單來說:Future 表示一個(gè) 異步任務(wù)的結(jié)果,針對(duì)這個(gè)結(jié)果可以添加 Callback 方法以便在任務(wù) 執(zhí)行成功或失敗后做出對(duì)應(yīng)的操作,而 Promise 交由任務(wù)執(zhí)行者,任務(wù)執(zhí)行者通過 Promise 可以標(biāo)記任務(wù)完成或者失敗。
在 Netty 中:
- Future 接口定義了 isSuccess(),isCancellable(),cause() 等方法,這些判斷異步執(zhí)行狀態(tài)的方法都是只讀的。
- Promise 接口在 extends Future 的基礎(chǔ)上增加了 setSuccess(),setFailure() 等方法,這些方法是可寫的,即 Promise 是可寫的 Future。
4、事件(event) 和 ChannelHandler
ChannelHandler
Netty 是一個(gè)事件驅(qū)動(dòng)的框架,所有的 event(事件) 都是由 Handler 來進(jìn)行處理。
ChannelHandler 可以處理 I/O、攔截 I/O 或者將 event 傳遞給 ChannelPipeline 中的下一個(gè) Handler 進(jìn)行處理。
ChannelHandler 的結(jié)構(gòu)很簡(jiǎn)單,只有三個(gè)方法,分別是:
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
event
Netty 用細(xì)分的 event(事件) 來通知我們狀態(tài)的變化或者操作的狀況。這讓我們可以基于發(fā)的 event 來觸發(fā)適當(dāng)?shù)男袨椤_@類行為可能包括:
- 日志記錄
- 數(shù)據(jù)傳送
- 流控制
- 應(yīng)用邏輯
event 按輸入或者輸出數(shù)據(jù)流的關(guān)系來分類。可能被輸入數(shù)據(jù)或者相關(guān)狀態(tài)改變觸發(fā)的 event 包括:
- 活躍或者停用的連接
- 讀數(shù)據(jù)
- 用戶 event
- 錯(cuò)誤 event
而輸出 event 則是會(huì)觸發(fā)將來行為的操作的結(jié)果,可能會(huì)是:
- 打開或者關(guān)閉到遠(yuǎn)端的連接
- 寫或者刷數(shù)據(jù)到一個(gè) socket
每一個(gè) event 都可以被分派到一個(gè)用戶實(shí)現(xiàn)的 handler 對(duì)象的方法。
Hello World
一個(gè)簡(jiǎn)單的 websocket 服務(wù)端,如下所示:
Server 代碼:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
public class Server {
public static void main(String[] args) throws InterruptedException {
// 用來接收客戶端傳進(jìn)來的連接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 用來處理已被接收的連接
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 創(chuàng)建 netty 服務(wù)
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workerGroup)
// 設(shè)置 NIO 模式
.channel(NioServerSocketChannel.class)
// 設(shè)置 tcp 緩沖區(qū)
.option(ChannelOption.SO_BACKLOG, 1024)
// 設(shè)置發(fā)送緩沖區(qū)數(shù)據(jù)大小
.childOption(ChannelOption.SO_SNDBUF, 64 * 1024)
// 設(shè)置接收緩沖區(qū)數(shù)據(jù)大小
.option(ChannelOption.SO_RCVBUF, 64 * 1024)
// 保持長(zhǎng)連接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// HttpClient編解碼器
pipeline.addLast(new HttpServerCodec());
// 設(shè)置最大內(nèi)容長(zhǎng)度
pipeline.addLast(new HttpObjectAggregator(65536));
// WebSocket 數(shù)據(jù)壓縮擴(kuò)展
pipeline.addLast(new WebSocketServerCompressionHandler());
// WebSocket 握手、控制幀處理
pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true));
// 通道的初始化,數(shù)據(jù)傳輸過來進(jìn)行攔截及執(zhí)行
pipeline.addLast(new ServerHandler());
}
});
// 綁定端口啟動(dòng)服務(wù)
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
ServerHandler 代碼:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道激活(回調(diào))");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 僅處理 TextWebSocketFrame
if (msg instanceof TextWebSocketFrame) {
String request = ((TextWebSocketFrame) msg).text();
System.out.println("收到請(qǐng)求:" + request);
ctx.writeAndFlush(new TextWebSocketFrame("PONG"));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("數(shù)據(jù)讀取完成");
}
}
pom 依賴
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
</dependencies>
然后運(yùn)行 Server 即可。
接下來我們來測(cè)試一下程序是否正常,這里使用到一個(gè)在線測(cè)試網(wǎng)站:
http://www.easyswoole.com/wstool.html
連接上我們的服務(wù),如下圖所示:
連接websocket
如果出現(xiàn) OPENED => 127.0.0.1:8080 的提示,則表示連接成功。否則請(qǐng)排查是否程序和示例代碼一致。
然后我們點(diǎn)擊開始發(fā)送按鈕,如果出現(xiàn)以下提示則表示,消息發(fā)送成功啦。
發(fā)送消息1
發(fā)送消息2
好了到這里,我們的 Hello World 已經(jīng)完成了。