日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

Netty 對 JDK 自帶的 NIO 的 API 進行了良好的封裝,解決了如客戶端面臨斷線重連、 網絡閃斷、心跳處理、半包讀寫、 網絡擁塞和異常流的處理等等問題。且Netty擁有高性能、 吞吐量更高,延遲更低,減少資源消耗,最小化不必要的內存復制等優點。Netty 現在都在用的是4.x,5.x版本已經廢棄,Netty 4.x 需要JDK 6以上版本支持。

Netty的使用場景

1)互聯網行業:在分布式系統中,各個節點之間需要遠程服務調用,高性能的 RPC 框架必不可少,Netty 作為異步高性能的通信框架,往往作為基礎通信組件被這些 RPC 框架使用。典型的應用有:阿里分布式服務框架 Dubbo 的RPC 框架使用 Dubbo 協議進行節點間通信,Dubbo 協議默認使用 Netty 作為基礎通信組件,用于實現。各進程節點之間的內部通信。Rocketmq底層也是用的Netty作為基礎通信組件。

2)游戲行業:無論是手游服務端還是大型的網絡游戲,JAVA 語言得到了越來越廣泛的應用。Netty 作為高性能的基礎通信組件,它本身提供了 TCP/UDP 和 HTTP 協議棧。

3)大數據領域:經典的 Hadoop 的高性能通信和序列化組件 Avro 的 RPC 框架,默認采用 Netty 進行跨界點通信,它的 Netty Service 基于 Netty 框架二次封裝實現。

netty相關開源項目:
https://netty.io/wiki/related-projects.html

Netty示例

Netty示例idea中代碼結構:

示例代碼結構

pom文件中添加Netty的maven依賴:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.35.Final</version>
</dependency>

Netty服務端示例代碼:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NIOServerSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws Exception {
        // 創建兩個線程組bossGroup和workerGroup, 含有的子線程NioEventLoop的個數默認為cpu核數的兩倍
        // bossGroup只是處理連接請求 ,真正的和客戶端業務處理,會交給workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(3);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            // 創建服務器端的啟動對象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用鏈式編程來配置參數
            bootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                    // 使用NioServerSocketChannel作為服務器的通道實現
                    .channel(NioServerSocketChannel.class)
                    // 初始化服務器連接隊列大小,服務端處理客戶端連接請求是順序處理的,所以同一時間只能處理一個客戶端連接。
                    // 多個客戶端同時來的時候,服務端將不能處理的客戶端連接請求放在隊列中等待處理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {//創建通道初始化對象,設置初始化參數,在 SocketChannel 建立起來之前執行

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //對workerGroup的SocketChannel設置處理器,Handler示例見下面
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            // 綁定一個端口并且同步, 生成了一個ChannelFuture異步對象,通過isDone()等方法可以判斷異步事件的執行情況
            // 啟動服務器(并綁定端口),bind是異步操作,sync方法是等待異步操作執行完畢
            ChannelFuture cf = bootstrap.bind(9000).sync();
            // 給cf注冊監聽器,監聽我們關心的事件
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("監聽端口9000成功");
                    } else {
                        System.out.println("監聽端口9000失敗");
                    }
                }
            });
            // 等待服務端監聽端口關閉,closeFuture是異步操作
            // 通過sync方法同步等待通道關閉處理完畢,這里會阻塞等待通道關閉完成,內部調用的是Object的wait()方法
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 自定義Handler需要繼承netty規定好的某個HandlerAdapter(規范)
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 當客戶端連接服務器完成就會觸發該方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("客戶端連接通道建立完成");
    }

    /**
     * 讀取客戶端發送的數據
     *
     * @param ctx 上下文對象, 含有通道channel,管道pipeline
     * @param msg 就是客戶端發送的數據
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //Channel channel = ctx.channel();
        //ChannelPipeline pipeline = ctx.pipeline(); //本質是一個雙向鏈接, 出站入站
        //將 msg 轉成一個 ByteBuf,類似NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到客戶端的消息:" + buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 數據讀取完畢處理方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    /**
     * 處理異常, 一般是需要關閉通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}

Netty客戶端示例代碼:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws Exception {
        //客戶端需要一個事件循環組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建客戶端啟動對象
            //注意客戶端使用的不是ServerBootstrap而是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //設置相關參數
            bootstrap.group(group) //設置線程組
                    .channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實現
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //加入處理器,Handler代碼見下面
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("netty client start。。");
            //啟動客戶端去連接服務器端
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
            //對通道關閉進行監聽
            cf.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 當客戶端連接服務器完成就會觸發該方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    //當通道有讀取事件時會觸發,即服務端發送數據給客戶端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服務端的消息:" + buf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty線程模型

Netty的線程模型如下圖所示:

線程模型

模型解釋

1) Netty 抽象出兩組線程池BossGroup和WorkerGroup,BossGroup專門負責接收客戶端的連接, WorkerGroup專門負責網絡的讀寫

2) BossGroup和WorkerGroup類型都是NioEventLoopGroup

3) NioEventLoopGroup 相當于一個事件循環線程組, 這個組中含有多個事件循環線程 , 每一個事件循環線程是NioEventLoop

4) 每個NioEventLoop都有一個selector , 用于監聽注冊在其上的socketChannel的網絡通訊

5) 每個Boss NioEventLoop線程內部循環執行的步驟有 3 步:

處理accept事件 , 與client 建立連接 , 生成 NioSocketChannel

將NioSocketChannel注冊到某個worker NIOEventLoop上的selector

處理任務隊列的任務 , 即runAllTasks

6) 每個worker NIOEventLoop線程循環執行的步驟:

輪詢注冊到自己selector上的所有NioSocketChannel 的read, write事件

處理 I/O 事件, 即read , write 事件, 在對應NioSocketChannel 處理業務

runAllTasks處理任務隊列TaskQueue的任務 ,一些耗時的業務處理一般可以放入

TaskQueue中慢慢處理,這樣不影響數據在 pipeline 中的流動處理

7) 每個worker NIOEventLoop處理NioSocketChannel業務時,會使用 pipeline (管道),管道中維護了很多 handler處理器用來處理 channel 中的數據

Netty模塊組件

【Bootstrap、ServerBootstrap】:

Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程序,串聯各個組件,Netty 中 Bootstrap 類是客戶端程序的啟動引導類,ServerBootstrap 是服務端啟動引導類。

【Future、ChannelFuture】:

正如前面介紹,在 Netty 中所有的 IO 操作都是異步的,不能立刻得知消息是否被正確處理。但是可以過一會等它執行完成或者直接注冊一個監聽,具體的實現就是通過 Future 和 ChannelFutures,他們可以注冊一個監聽,當操作執行成功或失敗時監聽會自動觸發注冊的監聽事件。

【Channel】:

Netty 網絡通信的組件,能夠用于執行網絡 I/O 操作。Channel 為用戶提供:

1)當前網絡連接的通道的狀態(例如是否打開?是否已連接?)

2)網絡連接的配置參數 (例如接收緩沖區大小)

3)提供異步的網絡 I/O 操作(如建立連接,讀寫,綁定端口),異步調用意味著任何 I/O 調用都將立即返回,并且不保證在調用結束時所請求的 I/O 操作已完成。

4)調用立即返回一個 ChannelFuture 實例,通過注冊監聽器到 ChannelFuture 上,可以 I/O 操作成功、失敗或取消時回調通知調用方。

5)支持關聯 I/O 操作與對應的處理程序。

不同協議、不同的阻塞類型的連接都有不同的 Channel 類型與之對應。

下面是一些常用的 Channel 類型:

  •  NioSocketChannel,異步的客戶端 TCP Socket 連接。
  •  NioServerSocketChannel,異步的服務器端 TCP Socket 連接。
  •  NioDatagramChannel,異步的 UDP 連接。
  •  NioSctpChannel,異步的客戶端 Sctp 連接。
  •  NioSctpServerChannel,異步的 Sctp 服務器端連接。
  •  這些通道涵蓋了 UDP 和 TCP 網絡 IO 以及文件 IO。

【Selector】:

Netty 基于 Selector 對象實現 I/O 多路復用,通過 Selector 一個線程可以監聽多個連接的 Channel 事件。當向一個 Selector 中注冊 Channel 后,Selector 內部的機制就可以自動不斷地查詢(Select) 這些注冊的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網絡連接完成等),這樣程序就可以很簡單地使用一個線程高效地管理多個 Channel 。

【NioEventLoop】:

NioEventLoop 中維護了一個線程和任務隊列,支持異步提交執行任務,線程啟動時會調用 NioEventLoop 的 run 方法,執行 I/O 任務和非 I/O 任務:

I/O 任務,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法觸發。

非 IO 任務,添加到 taskQueue 中的任務,如 register0、bind0 等任務,由 runAllTasks 方法觸發。

【NioEventLoopGroup】:

NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解為一個線程池,內部維護了一組線程,每個線程(NioEventLoop)負責處理多個 Channel 上的事件,而一個 Channel 只對應于一個線程。

【ChannelHandler】:

ChannelHandler 是一個接口,處理 I/O 事件或攔截 I/O 操作,并將其轉發到其 ChannelPipeline(業務處理鏈)中的下一個處理程序。

ChannelHandler 本身并沒有提供很多方法,因為這個接口有許多的方法需要實現,方便使用期間,可以繼承它的子類:

  • ChannelInboundHandler 用于處理入站 I/O 事件。
  • ChannelOutboundHandler 用于處理出站 I/O 操作。

或者使用以下適配器類:

  • ChannelInboundHandlerAdapter 用于處理入站 I/O 事件。
  •  ChannelOutboundHandlerAdapter 用于處理出站 I/O 操作。

【ChannelHandlerContext】:

保存 Channel 相關的所有上下文信息,同時關聯一個 ChannelHandler 對象。

【ChannelPipline】:

保存 ChannelHandler 的 List,用于處理或攔截 Channel 的入站事件和出站操作。ChannelPipeline 實現了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式,以及 Channel 中各個的 ChannelHandler 如何相互交互。

在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關系如下圖:

ChannelPipeline

一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向鏈表,并且每個 ChannelHandlerContext 中又關聯著一個 ChannelHandler。read事件(入站事件)和write事件(出站事件)在一個雙向鏈表中,入站事件會從鏈表 head 往后傳遞到最后一個入站的handler,出站事件會從鏈表 tail 往前傳遞到最前一個出站的 handler,兩種類型的 handler 互不干擾。

ByteBuf詳解

從結構上來說,ByteBuf 由一串字節數組構成。數組中每個字節用來存放信息。ByteBuf 提供了兩個索引,一個用于讀取數據,一個用于寫入數據。這兩個索引通過在字節數組中移動,來定位需要讀或者寫信息的位置。當從 ByteBuf 讀取時,它的 readerIndex(讀索引)將會根據讀取的字節數遞增。同樣,當寫 ByteBuf 時,它的 writerIndex 也會根據寫入的字節數進行遞增。

需要注意的是極限的情況是 readerIndex 剛好讀到了 writerIndex 寫入的地方。如果 readerIndex 超過了 writerIndex 的時候,Netty 會拋出
IndexOutOf-BoundsException 異常。

聲明

本文來自于圖靈學院課堂講義。

分享到:
標簽:Netty
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定