一、目錄介紹
- 前置知識點
- NIO
- Channel
- Callback
- Future 和 Promise
- 事件和 ChannelHandler
二、前置知識點
1、NIO
首先我們需要回顧一下,同步、異步、阻塞、非阻塞的相關概念。
- 同步:調用 API 后,調用者能“馬上”就知道操作的結果。
- 異步:相對于同步,調用 API 后,調用者不能“馬上”知道操作的結果,要等被調用方 回調 通知結果。
- 阻塞:等待全部數據讀取(寫入)完成后,才返回。
- 非阻塞:讀取時,讀多少返回多少;寫入時,寫入多少返回多少。不用等待,全部數據完成操作后,才返回。
NIO 是一種 同步非阻塞 的 I/O模型。
- 同步是指線程不斷輪詢 I/O 事件是否就緒。
- 非阻塞是指線程在等待 I/O 的時候,可以同時做其他任務。
同步的核心是 選擇器,選擇器代替了線程本身輪詢 I/O 事件,避免了阻塞同時減少了不必要的線程消耗;非阻塞的核心就是 通道和緩沖區,當 I/O 事件就緒時,可以通過寫到緩沖區,保證 I/O 的成功,而無需線程阻塞式地等待。
NIO主要有三大核心部分:
- Channel(通道)
- Buffer(緩沖區)
- Selector(選擇器)
傳統 I/O 基于 字節流和字符流 進行操作,而 NIO 基于 Channel 和 Buffer 進行操作,數據總是從通道讀取到緩沖區中,或者從緩沖區寫入到通道中。Selector 用于監聽多個通道的事件(連接打開,數據到達等)。因此,單個線程可以監聽多個數據通道,如下圖所示:
NIO
三、Netty 的核心組件
1、Channel
Channel 是一個通道,用于連接字節緩沖區 Buffer 和另一端的實體。在 NIO 網絡編程模型中,服務端和客戶端進行 I/O 數據交互(得到彼此推送的信息)的媒介就是 Channel。
Netty 對 JDK 原生的 ServerSocketChannel 進行了封裝和增強。
Netty的Channel增加了如下的組件:
- id 標識唯一身份信息
- 可能存在的 parent Channel
- 管道 pepiline
- 用于數據讀寫的 unsafe 內部類
- 事件循環執行器 NioEventLoop
Channel可以分成兩類:
- 服務端: NIOServerSocketChannel
- 客戶端: NioSocketChannel
具體依賴關系如下圖所示:
服務端: NioServerSocketChannel
NioServerSocketChannel
客戶端: NioSocketChannel
NioSocketChannel
2、Callback
callback 就是回調,一個方法可以在適當的時候回過頭來調用這個 callback 方法。callback 是用于通知相關方某個操作已經完成最常用的方法之一。
Netty 在處理事件時內部使用了 callback。當一個 callback 被觸發,事件可以被 ChannelHandler 的接口實現處理。
一個簡單的例子如下所示:
public class ConnectHandler extends ChannelInboundHandlerAdapter {
// 當一個新的連接建立時,channelActive 被調用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress());
}
}
當一個新的連接建立后,ChannelHandler 的 callback 方法 channelActive() 會被調用,然后打印一條消息。
這個 ConnectHandler 實例(相當于被調用者)以參數的形式傳入創建 Channel 連接的函數(調用者)中,之后這個函數創建新連接后,就會回來調用這個 ConnectHandler 的 channelActive 方法,這個過程就叫回調。
3、Future 和 Promise
Future 和 Promise 起源于函數式編程,目的是將值(Future)與其計算方式(Promise)分離,從而允許更靈活地進行計算,特別是通過并行化。
Future 表示目標計算的返回值,Promise 表示計算的方式,這個模型將返回結果和計算邏輯分離,目的是為了讓計算邏輯不影響返回結果,從而抽象出一套異步編程模型。它們之間的紐帶就是 Callback。
簡單來說:Future 表示一個 異步任務的結果,針對這個結果可以添加 Callback 方法以便在任務 執行成功或失敗后做出對應的操作,而 Promise 交由任務執行者,任務執行者通過 Promise 可以標記任務完成或者失敗。
在 Netty 中:
- Future 接口定義了 isSuccess(),isCancellable(),cause() 等方法,這些判斷異步執行狀態的方法都是只讀的。
- Promise 接口在 extends Future 的基礎上增加了 setSuccess(),setFailure() 等方法,這些方法是可寫的,即 Promise 是可寫的 Future。
4、事件(event) 和 ChannelHandler
ChannelHandler
Netty 是一個事件驅動的框架,所有的 event(事件) 都是由 Handler 來進行處理。
ChannelHandler 可以處理 I/O、攔截 I/O 或者將 event 傳遞給 ChannelPipeline 中的下一個 Handler 進行處理。
ChannelHandler 的結構很簡單,只有三個方法,分別是:
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
event
Netty 用細分的 event(事件) 來通知我們狀態的變化或者操作的狀況。這讓我們可以基于發的 event 來觸發適當的行為。這類行為可能包括:
- 日志記錄
- 數據傳送
- 流控制
- 應用邏輯
event 按輸入或者輸出數據流的關系來分類。可能被輸入數據或者相關狀態改變觸發的 event 包括:
- 活躍或者停用的連接
- 讀數據
- 用戶 event
- 錯誤 event
而輸出 event 則是會觸發將來行為的操作的結果,可能會是:
- 打開或者關閉到遠端的連接
- 寫或者刷數據到一個 socket
每一個 event 都可以被分派到一個用戶實現的 handler 對象的方法。
Hello World
一個簡單的 websocket 服務端,如下所示:
Server 代碼:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
public class Server {
public static void main(String[] args) throws InterruptedException {
// 用來接收客戶端傳進來的連接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 用來處理已被接收的連接
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 創建 netty 服務
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workerGroup)
// 設置 NIO 模式
.channel(NioServerSocketChannel.class)
// 設置 tcp 緩沖區
.option(ChannelOption.SO_BACKLOG, 1024)
// 設置發送緩沖區數據大小
.childOption(ChannelOption.SO_SNDBUF, 64 * 1024)
// 設置接收緩沖區數據大小
.option(ChannelOption.SO_RCVBUF, 64 * 1024)
// 保持長連接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// HttpClient編解碼器
pipeline.addLast(new HttpServerCodec());
// 設置最大內容長度
pipeline.addLast(new HttpObjectAggregator(65536));
// WebSocket 數據壓縮擴展
pipeline.addLast(new WebSocketServerCompressionHandler());
// WebSocket 握手、控制幀處理
pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true));
// 通道的初始化,數據傳輸過來進行攔截及執行
pipeline.addLast(new ServerHandler());
}
});
// 綁定端口啟動服務
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
ServerHandler 代碼:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道激活(回調)");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 僅處理 TextWebSocketFrame
if (msg instanceof TextWebSocketFrame) {
String request = ((TextWebSocketFrame) msg).text();
System.out.println("收到請求:" + request);
ctx.writeAndFlush(new TextWebSocketFrame("PONG"));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("數據讀取完成");
}
}
pom 依賴
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
</dependencies>
然后運行 Server 即可。
接下來我們來測試一下程序是否正常,這里使用到一個在線測試網站:
http://www.easyswoole.com/wstool.html
連接上我們的服務,如下圖所示:
連接websocket
如果出現 OPENED => 127.0.0.1:8080 的提示,則表示連接成功。否則請排查是否程序和示例代碼一致。
然后我們點擊開始發送按鈕,如果出現以下提示則表示,消息發送成功啦。
發送消息1
發送消息2
好了到這里,我們的 Hello World 已經完成了。