Netty 對 JDK 自帶的 NIO 的 API 進行了良好的封裝,解決了如客戶端面臨斷線重連、 網絡閃斷、心跳處理、半包讀寫、 網絡擁塞和異常流的處理等等問題。且Netty擁有高性能、 吞吐量更高,延遲更低,減少資源消耗,最小化不必要的內存復制等優點。Netty 現在都在用的是4.x,5.x版本已經廢棄,Netty 4.x 需要JDK 6以上版本支持。
Netty的使用場景
1)互聯網行業:在分布式系統中,各個節點之間需要遠程服務調用,高性能的 RPC 框架必不可少,Netty 作為異步高性能的通信框架,往往作為基礎通信組件被這些 RPC 框架使用。典型的應用有:阿里分布式服務框架 Dubbo 的RPC 框架使用 Dubbo 協議進行節點間通信,Dubbo 協議默認使用 Netty 作為基礎通信組件,用于實現。各進程節點之間的內部通信。Rocketmq底層也是用的Netty作為基礎通信組件。
2)游戲行業:無論是手游服務端還是大型的網絡游戲,JAVA 語言得到了越來越廣泛的應用。Netty 作為高性能的基礎通信組件,它本身提供了 TCP/UDP 和 HTTP 協議棧。
3)大數據領域:經典的 Hadoop 的高性能通信和序列化組件 Avro 的 RPC 框架,默認采用 Netty 進行跨界點通信,它的 Netty Service 基于 Netty 框架二次封裝實現。
netty相關開源項目:
https://netty.io/wiki/related-projects.html
Netty示例
Netty示例idea中代碼結構:
示例代碼結構
pom文件中添加Netty的maven依賴:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.35.Final</version>
</dependency>
Netty服務端示例代碼:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NIOServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws Exception {
// 創建兩個線程組bossGroup和workerGroup, 含有的子線程NioEventLoop的個數默認為cpu核數的兩倍
// bossGroup只是處理連接請求 ,真正的和客戶端業務處理,會交給workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 創建服務器端的啟動對象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用鏈式編程來配置參數
bootstrap.group(bossGroup, workerGroup) //設置兩個線程組
// 使用NioServerSocketChannel作為服務器的通道實現
.channel(NioServerSocketChannel.class)
// 初始化服務器連接隊列大小,服務端處理客戶端連接請求是順序處理的,所以同一時間只能處理一個客戶端連接。
// 多個客戶端同時來的時候,服務端將不能處理的客戶端連接請求放在隊列中等待處理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//創建通道初始化對象,設置初始化參數,在 SocketChannel 建立起來之前執行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//對workerGroup的SocketChannel設置處理器,Handler示例見下面
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start。。");
// 綁定一個端口并且同步, 生成了一個ChannelFuture異步對象,通過isDone()等方法可以判斷異步事件的執行情況
// 啟動服務器(并綁定端口),bind是異步操作,sync方法是等待異步操作執行完畢
ChannelFuture cf = bootstrap.bind(9000).sync();
// 給cf注冊監聽器,監聽我們關心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("監聽端口9000成功");
} else {
System.out.println("監聽端口9000失敗");
}
}
});
// 等待服務端監聽端口關閉,closeFuture是異步操作
// 通過sync方法同步等待通道關閉處理完畢,這里會阻塞等待通道關閉完成,內部調用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 自定義Handler需要繼承netty規定好的某個HandlerAdapter(規范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 當客戶端連接服務器完成就會觸發該方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客戶端連接通道建立完成");
}
/**
* 讀取客戶端發送的數據
*
* @param ctx 上下文對象, 含有通道channel,管道pipeline
* @param msg 就是客戶端發送的數據
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本質是一個雙向鏈接, 出站入站
//將 msg 轉成一個 ByteBuf,類似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到客戶端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 數據讀取完畢處理方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 處理異常, 一般是需要關閉通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
Netty客戶端示例代碼:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客戶端需要一個事件循環組
EventLoopGroup group = new NioEventLoopGroup();
try {
//創建客戶端啟動對象
//注意客戶端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設置相關參數
bootstrap.group(group) //設置線程組
.channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實現
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入處理器,Handler代碼見下面
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("netty client start。。");
//啟動客戶端去連接服務器端
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
//對通道關閉進行監聽
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 當客戶端連接服務器完成就會觸發該方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
//當通道有讀取事件時會觸發,即服務端發送數據給客戶端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服務端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Netty線程模型
Netty的線程模型如下圖所示:
線程模型
模型解釋:
1) Netty 抽象出兩組線程池BossGroup和WorkerGroup,BossGroup專門負責接收客戶端的連接, WorkerGroup專門負責網絡的讀寫
2) BossGroup和WorkerGroup類型都是NioEventLoopGroup
3) NioEventLoopGroup 相當于一個事件循環線程組, 這個組中含有多個事件循環線程 , 每一個事件循環線程是NioEventLoop
4) 每個NioEventLoop都有一個selector , 用于監聽注冊在其上的socketChannel的網絡通訊
5) 每個Boss NioEventLoop線程內部循環執行的步驟有 3 步:
處理accept事件 , 與client 建立連接 , 生成 NioSocketChannel
將NioSocketChannel注冊到某個worker NIOEventLoop上的selector
處理任務隊列的任務 , 即runAllTasks
6) 每個worker NIOEventLoop線程循環執行的步驟:
輪詢注冊到自己selector上的所有NioSocketChannel 的read, write事件
處理 I/O 事件, 即read , write 事件, 在對應NioSocketChannel 處理業務
runAllTasks處理任務隊列TaskQueue的任務 ,一些耗時的業務處理一般可以放入
TaskQueue中慢慢處理,這樣不影響數據在 pipeline 中的流動處理
7) 每個worker NIOEventLoop處理NioSocketChannel業務時,會使用 pipeline (管道),管道中維護了很多 handler處理器用來處理 channel 中的數據
Netty模塊組件
【Bootstrap、ServerBootstrap】:
Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程序,串聯各個組件,Netty 中 Bootstrap 類是客戶端程序的啟動引導類,ServerBootstrap 是服務端啟動引導類。
【Future、ChannelFuture】:
正如前面介紹,在 Netty 中所有的 IO 操作都是異步的,不能立刻得知消息是否被正確處理。但是可以過一會等它執行完成或者直接注冊一個監聽,具體的實現就是通過 Future 和 ChannelFutures,他們可以注冊一個監聽,當操作執行成功或失敗時監聽會自動觸發注冊的監聽事件。
【Channel】:
Netty 網絡通信的組件,能夠用于執行網絡 I/O 操作。Channel 為用戶提供:
1)當前網絡連接的通道的狀態(例如是否打開?是否已連接?)
2)網絡連接的配置參數 (例如接收緩沖區大小)
3)提供異步的網絡 I/O 操作(如建立連接,讀寫,綁定端口),異步調用意味著任何 I/O 調用都將立即返回,并且不保證在調用結束時所請求的 I/O 操作已完成。
4)調用立即返回一個 ChannelFuture 實例,通過注冊監聽器到 ChannelFuture 上,可以 I/O 操作成功、失敗或取消時回調通知調用方。
5)支持關聯 I/O 操作與對應的處理程序。
不同協議、不同的阻塞類型的連接都有不同的 Channel 類型與之對應。
下面是一些常用的 Channel 類型:
- NioSocketChannel,異步的客戶端 TCP Socket 連接。
- NioServerSocketChannel,異步的服務器端 TCP Socket 連接。
- NioDatagramChannel,異步的 UDP 連接。
- NioSctpChannel,異步的客戶端 Sctp 連接。
- NioSctpServerChannel,異步的 Sctp 服務器端連接。
- 這些通道涵蓋了 UDP 和 TCP 網絡 IO 以及文件 IO。
【Selector】:
Netty 基于 Selector 對象實現 I/O 多路復用,通過 Selector 一個線程可以監聽多個連接的 Channel 事件。當向一個 Selector 中注冊 Channel 后,Selector 內部的機制就可以自動不斷地查詢(Select) 這些注冊的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網絡連接完成等),這樣程序就可以很簡單地使用一個線程高效地管理多個 Channel 。
【NioEventLoop】:
NioEventLoop 中維護了一個線程和任務隊列,支持異步提交執行任務,線程啟動時會調用 NioEventLoop 的 run 方法,執行 I/O 任務和非 I/O 任務:
I/O 任務,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法觸發。
非 IO 任務,添加到 taskQueue 中的任務,如 register0、bind0 等任務,由 runAllTasks 方法觸發。
【NioEventLoopGroup】:
NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解為一個線程池,內部維護了一組線程,每個線程(NioEventLoop)負責處理多個 Channel 上的事件,而一個 Channel 只對應于一個線程。
【ChannelHandler】:
ChannelHandler 是一個接口,處理 I/O 事件或攔截 I/O 操作,并將其轉發到其 ChannelPipeline(業務處理鏈)中的下一個處理程序。
ChannelHandler 本身并沒有提供很多方法,因為這個接口有許多的方法需要實現,方便使用期間,可以繼承它的子類:
- ChannelInboundHandler 用于處理入站 I/O 事件。
- ChannelOutboundHandler 用于處理出站 I/O 操作。
或者使用以下適配器類:
- ChannelInboundHandlerAdapter 用于處理入站 I/O 事件。
- ChannelOutboundHandlerAdapter 用于處理出站 I/O 操作。
【ChannelHandlerContext】:
保存 Channel 相關的所有上下文信息,同時關聯一個 ChannelHandler 對象。
【ChannelPipline】:
保存 ChannelHandler 的 List,用于處理或攔截 Channel 的入站事件和出站操作。ChannelPipeline 實現了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式,以及 Channel 中各個的 ChannelHandler 如何相互交互。
在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關系如下圖:
ChannelPipeline
一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向鏈表,并且每個 ChannelHandlerContext 中又關聯著一個 ChannelHandler。read事件(入站事件)和write事件(出站事件)在一個雙向鏈表中,入站事件會從鏈表 head 往后傳遞到最后一個入站的handler,出站事件會從鏈表 tail 往前傳遞到最前一個出站的 handler,兩種類型的 handler 互不干擾。
ByteBuf詳解
從結構上來說,ByteBuf 由一串字節數組構成。數組中每個字節用來存放信息。ByteBuf 提供了兩個索引,一個用于讀取數據,一個用于寫入數據。這兩個索引通過在字節數組中移動,來定位需要讀或者寫信息的位置。當從 ByteBuf 讀取時,它的 readerIndex(讀索引)將會根據讀取的字節數遞增。同樣,當寫 ByteBuf 時,它的 writerIndex 也會根據寫入的字節數進行遞增。
需要注意的是極限的情況是 readerIndex 剛好讀到了 writerIndex 寫入的地方。如果 readerIndex 超過了 writerIndex 的時候,Netty 會拋出
IndexOutOf-BoundsException 異常。
聲明
本文來自于圖靈學院課堂講義。