日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

1. 協議的作用

TCP/IP 中消息傳輸基于流的方式,沒有邊界

協議的目的就是劃定消息的邊界,制定通信雙方要共同遵守的通信規則

2. redis 協議

如果我們要向 Redis 服務器發送一條 set name Nyima 的指令,需要遵守如下協議

// 該指令一共有3部分,每條指令之后都要添加回車與換行符
*3rn
// 第一個指令的長度是3
$3rn
// 第一個指令是set指令
setrn
// 下面的指令以此類推
$4rn
namern
$5rn
Nyimarn
復制代碼

客戶端代碼如下

public class RedisClient {
    static final Logger log = LoggerFactory.getLogger(StudyServer.class);
    public static void main(String[] args) {
        NioEventLoopGroup group =  new NioEventLoopGroup();
        try {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NIOSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            // 打印日志
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    // 回車與換行符
                                    final byte[] LINE = {'r','n'};
                                    // 獲得ByteBuf
                                    ByteBuf buffer = ctx.alloc().buffer();
                                    // 連接建立后,向Redis中發送一條指令,注意添加回車與換行
                                    // set name Nyima
                                    buffer.writeBytes("*3".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$3".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("set".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$4".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("name".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$5".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("Nyima".getBytes());
                                    buffer.writeBytes(LINE);
                                    ctx.writeAndFlush(buffer);
                                }

                            });
                        }
                    })
                    .connect(new .NETSocketAddress("localhost", 6379));
            channelFuture.sync();
            // 關閉channel
            channelFuture.channel().close().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 關閉group
            group.shutdownGracefully();
        }
    }
}
復制代碼

控制臺打印結果

1600 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x28c994f1, L:/127.0.0.1:60792 - R:localhost/127.0.0.1:6379] WRITE: 34B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5..Nyima|
|00000020| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
復制代碼

Redis 中查詢執行結果

 

3. HTTP 協議

HTTP 協議在請求行請求頭中都有很多的內容,自己實現較為困難,可以使用 HttpServerCodec 作為服務器端的解碼器與編碼器,來處理 HTTP 請求

// HttpServerCodec 中既有請求的解碼器 HttpRequestDecoder 又有響應的編碼器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表該類既作為 編碼器 又作為 解碼器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
        implements HttpServerUpgradeHandler.SourceCodec
復制代碼

服務器代碼

public class HttpServer {
    static final Logger log = LoggerFactory.getLogger(StudyServer.class);

    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        new ServerBootstrap()
                .group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        // 作為服務器,使用 HttpServerCodec 作為編碼器與解碼器
                        ch.pipeline().addLast(new HttpServerCodec());
                        // 服務器只處理HTTPRequest
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
                                // 獲得請求uri
                                log.debug(msg.uri());

                                // 獲得完整響應,設置版本號與狀態碼
                                DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                                // 設置響應內容
                                byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
                                // 設置響應體長度,避免瀏覽器一直接收響應內容
                                response.headers().setInt(CONTENT_LENGTH, bytes.length);
                                // 設置響應體
                                response.content().writeBytes(bytes);

                                // 寫回響應
                                ctx.writeAndFlush(response);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}
復制代碼

服務器負責處理請求并響應瀏覽器。所以只需要處理 HTTP 請求即可

// 服務器只處理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
復制代碼

獲得請求后,需要返回響應給瀏覽器。需要創建響應對象 DefaultFullHttpResponse,設置 HTTP 版本號及狀態碼,為避免瀏覽器獲得響應后,因為獲得 CONTENT_LENGTH 而一直空轉,需要添加 CONTENT_LENGTH 字段,表明響應體中數據的具體長度

// 獲得完整響應,設置版本號與狀態碼
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 設置響應內容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 設置響應體長度,避免瀏覽器一直接收響應內容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 設置響應體
response.content().writeBytes(bytes);
復制代碼

運行結果

瀏覽器

 

控制臺

// 請求內容
1714 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] READ: 688B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 50 72 61 67 6d 61 3a |p-alive..Pragma:|
....

// 響應內容
1716 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] WRITE: 61B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 43 6f 6e 74 65 6e 74 2d 4c 65 6e 67 74 68 3a |.Content-Length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e          |, World!</h1>   |
+--------+-------------------------------------------------+----------------+

復制代碼

4. 自定義協議

組成要素

  • 魔數:用來在第一時間判定接收的數據是否為無效數據包
  • 版本號:可以支持協議的升級
  • 序列化算法
  • :消息正文到底采用哪種序列化反序列化方式
    • 如:json、protobuf、hessian、jdk
  • 指令類型:是登錄、注冊、單聊、群聊… 跟業務相關
  • 請求序號:為了雙工通信,提供異步能力
  • 正文長度
  • 消息正文

編碼器與解碼器

public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 設置魔數 4個字節
        out.writeBytes(new byte[]{'N','Y','I','M'});
        // 設置版本號 1個字節
        out.writeByte(1);
        // 設置序列化方式 1個字節
        out.writeByte(1);
        // 設置指令類型 1個字節
        out.writeByte(msg.getMessageType());
        // 設置請求序號 4個字節
        out.writeInt(msg.getSequenceId());
        // 為了補齊為16個字節,填充1個字節的數據
        out.writeByte(0xff);

        // 獲得序列化后的msg
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();

        // 獲得并設置正文長度 長度用4個字節標識
        out.writeInt(bytes.length);
        // 設置消息正文
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 獲取魔數
        int magic = in.readInt();
        // 獲取版本號
        byte version = in.readByte();
        // 獲得序列化方式
        byte seqType = in.readByte();
        // 獲得指令類型
        byte messageType = in.readByte();
        // 獲得請求序號
        int sequenceId = in.readInt();
        // 移除補齊字節
        in.readByte();
        // 獲得正文長度
        int length = in.readInt();
        // 獲得正文
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
		// 將信息放入List中,傳遞給下一個handler
        out.add(message);
        
        // 打印獲得的信息正文
        System.out.println("===========魔數===========");
        System.out.println(magic);
        System.out.println("===========版本號===========");
        System.out.println(version);
        System.out.println("===========序列化方法===========");
        System.out.println(seqType);
        System.out.println("===========指令類型===========");
        System.out.println(messageType);
        System.out.println("===========請求序號===========");
        System.out.println(sequenceId);
        System.out.println("===========正文長度===========");
        System.out.println(length);
        System.out.println("===========正文===========");
        System.out.println(message);
    }
}

復制代碼
  • 編碼器與解碼器方法源于父類 ByteToMessageCodec,通過該類可以自定義編碼器與解碼器, 泛型類型為被編碼與被解碼的類。此處使用了自定義類 Message,代表消息
public class MessageCodec extends ByteToMessageCodec<Message>
復制代碼
  • 編碼器負責將附加信息與正文信息寫入到 ByteBuf 中,其中附加信息總字節數最好為 2n,不足需要補齊。正文內容如果為對象,需要通過序列化將其放入到 ByteBuf 中
  • 解碼器負責將 ByteBuf 中的信息取出,并放入 List 中,該 List 用于將信息傳遞給下一個 handler

編寫測試類

public class TestCodec {
    static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
    public static void main(String[] args) throws Exception {
        EmbeddedChannel channel = new EmbeddedChannel();
        // 添加解碼器,避免粘包半包問題
        channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
        channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        channel.pipeline().addLast(new MessageCodec());
        LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");

        // 測試編碼與解碼
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null, user, byteBuf);
        channel.writeInbound(byteBuf);
    }
}
復制代碼
  • 測試類中用到了 LengthFieldBasedFrameDecoder,避免粘包半包問題
  • 通過 MessageCodec 的 encode 方法將附加信息與正文寫入到 ByteBuf 中,通過 channel 執行入站操作。入站時會調用 decode 方法進行解碼

運行結果

 


 

@Sharable 注解

為了提高 handler 的復用率,可以將 handler 創建為 handler 對象,然后在不同的 channel 中使用該 handler 對象進行處理操作

LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一個handler對象,提高復用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
復制代碼

但是并不是所有的 handler 都能通過這種方法來提高復用率的,例如
LengthFieldBasedFrameDecoder。如果多個 channel 中使用同一個 LengthFieldBasedFrameDecoder 對象,則可能發生如下問題

  • channel1 中收到了一個半包,LengthFieldBasedFrameDecoder 發現不是一條完整的數據,則沒有繼續向下傳播
  • 此時 channel2 中也收到了一個半包,因為兩個 channel 使用了同一個 LengthFieldBasedFrameDecoder,存入其中的數據剛好拼湊成了一個完整的數據包。LengthFieldBasedFrameDecoder 讓該數據包繼續向下傳播,最終引發錯誤

為了提高 handler 的復用率,同時又避免出現一些并發問題,Netty 中原生的 handler 中用 @Sharable 注解來標明,該 handler 能否在多個 channel 中共享。

只有帶有該注解,才能通過對象的方式被共享,否則無法被共享

自定義編解碼器能否使用 @Sharable 注解

這需要根據自定義的 handler 的處理邏輯進行分析

我們的 MessageCodec 本身接收的是
LengthFieldBasedFrameDecoder 處理之后的數據,那么數據肯定是完整的,按分析來說是可以添加 @Sharable 注解的

但是實際情況我們并不能添加該注解,會拋出異常信息 ChannelHandler
cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared

  • 因為 MessageCodec 繼承自 ByteToMessageCodec,ByteToMessageCodec 類的注解如下
  •  

這就意味著 ByteToMessageCodec 不能被多個 channel 所共享的

  • 原因:因為該類的目標是:將 ByteBuf 轉化為 Message,意味著傳進該 handler 的數據還未被處理過。所以傳過來的 ByteBuf 可能并不是完整的數據,如果共享則會出現問題

如果想要共享,需要怎么辦呢?

繼承 MessageToMessageDecoder 即可。 該類的目標是:將已經被處理的完整數據再次被處理。傳過來的 Message 如果是被處理過的完整數據,那么被共享也就不會出現問題了,也就可以使用 @Sharable 注解了。實現方式與 ByteToMessageCodec 類似

@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        ...
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
		...
    }
}

分享到:
標簽:Netty
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定