1. 協議的作用
TCP/IP 中消息傳輸基于流的方式,沒有邊界
協議的目的就是劃定消息的邊界,制定通信雙方要共同遵守的通信規則
2. redis 協議
如果我們要向 Redis 服務器發送一條 set name Nyima 的指令,需要遵守如下協議
// 該指令一共有3部分,每條指令之后都要添加回車與換行符
*3rn
// 第一個指令的長度是3
$3rn
// 第一個指令是set指令
setrn
// 下面的指令以此類推
$4rn
namern
$5rn
Nyimarn
復制代碼
客戶端代碼如下
public class RedisClient {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NIOSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 打印日志
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 回車與換行符
final byte[] LINE = {'r','n'};
// 獲得ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 連接建立后,向Redis中發送一條指令,注意添加回車與換行
// set name Nyima
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$5".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("Nyima".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
});
}
})
.connect(new .NETSocketAddress("localhost", 6379));
channelFuture.sync();
// 關閉channel
channelFuture.channel().close().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 關閉group
group.shutdownGracefully();
}
}
}
復制代碼
控制臺打印結果
1600 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x28c994f1, L:/127.0.0.1:60792 - R:localhost/127.0.0.1:6379] WRITE: 34B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5..Nyima|
|00000020| 0d 0a |.. |
+--------+-------------------------------------------------+----------------+
復制代碼
Redis 中查詢執行結果
3. HTTP 協議
HTTP 協議在請求行請求頭中都有很多的內容,自己實現較為困難,可以使用 HttpServerCodec 作為服務器端的解碼器與編碼器,來處理 HTTP 請求
// HttpServerCodec 中既有請求的解碼器 HttpRequestDecoder 又有響應的編碼器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表該類既作為 編碼器 又作為 解碼器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec
復制代碼
服務器代碼
public class HttpServer {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 作為服務器,使用 HttpServerCodec 作為編碼器與解碼器
ch.pipeline().addLast(new HttpServerCodec());
// 服務器只處理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
// 獲得請求uri
log.debug(msg.uri());
// 獲得完整響應,設置版本號與狀態碼
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 設置響應內容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 設置響應體長度,避免瀏覽器一直接收響應內容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 設置響應體
response.content().writeBytes(bytes);
// 寫回響應
ctx.writeAndFlush(response);
}
});
}
})
.bind(8080);
}
}
復制代碼
服務器負責處理請求并響應瀏覽器。所以只需要處理 HTTP 請求即可
// 服務器只處理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
復制代碼
獲得請求后,需要返回響應給瀏覽器。需要創建響應對象 DefaultFullHttpResponse,設置 HTTP 版本號及狀態碼,為避免瀏覽器獲得響應后,因為獲得 CONTENT_LENGTH 而一直空轉,需要添加 CONTENT_LENGTH 字段,表明響應體中數據的具體長度
// 獲得完整響應,設置版本號與狀態碼
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 設置響應內容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 設置響應體長度,避免瀏覽器一直接收響應內容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 設置響應體
response.content().writeBytes(bytes);
復制代碼
運行結果
瀏覽器
控制臺
// 請求內容
1714 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] READ: 688B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 50 72 61 67 6d 61 3a |p-alive..Pragma:|
....
// 響應內容
1716 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] WRITE: 61B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 43 6f 6e 74 65 6e 74 2d 4c 65 6e 67 74 68 3a |.Content-Length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e |, World!</h1> |
+--------+-------------------------------------------------+----------------+
復制代碼
4. 自定義協議
組成要素
- 魔數:用來在第一時間判定接收的數據是否為無效數據包
- 版本號:可以支持協議的升級
- 序列化算法
- :消息正文到底采用哪種序列化反序列化方式
- 如:json、protobuf、hessian、jdk
- 指令類型:是登錄、注冊、單聊、群聊… 跟業務相關
- 請求序號:為了雙工通信,提供異步能力
- 正文長度
- 消息正文
編碼器與解碼器
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 設置魔數 4個字節
out.writeBytes(new byte[]{'N','Y','I','M'});
// 設置版本號 1個字節
out.writeByte(1);
// 設置序列化方式 1個字節
out.writeByte(1);
// 設置指令類型 1個字節
out.writeByte(msg.getMessageType());
// 設置請求序號 4個字節
out.writeInt(msg.getSequenceId());
// 為了補齊為16個字節,填充1個字節的數據
out.writeByte(0xff);
// 獲得序列化后的msg
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 獲得并設置正文長度 長度用4個字節標識
out.writeInt(bytes.length);
// 設置消息正文
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 獲取魔數
int magic = in.readInt();
// 獲取版本號
byte version = in.readByte();
// 獲得序列化方式
byte seqType = in.readByte();
// 獲得指令類型
byte messageType = in.readByte();
// 獲得請求序號
int sequenceId = in.readInt();
// 移除補齊字節
in.readByte();
// 獲得正文長度
int length = in.readInt();
// 獲得正文
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
// 將信息放入List中,傳遞給下一個handler
out.add(message);
// 打印獲得的信息正文
System.out.println("===========魔數===========");
System.out.println(magic);
System.out.println("===========版本號===========");
System.out.println(version);
System.out.println("===========序列化方法===========");
System.out.println(seqType);
System.out.println("===========指令類型===========");
System.out.println(messageType);
System.out.println("===========請求序號===========");
System.out.println(sequenceId);
System.out.println("===========正文長度===========");
System.out.println(length);
System.out.println("===========正文===========");
System.out.println(message);
}
}
復制代碼
- 編碼器與解碼器方法源于父類 ByteToMessageCodec,通過該類可以自定義編碼器與解碼器, 泛型類型為被編碼與被解碼的類。此處使用了自定義類 Message,代表消息
public class MessageCodec extends ByteToMessageCodec<Message>
復制代碼
- 編碼器負責將附加信息與正文信息寫入到 ByteBuf 中,其中附加信息總字節數最好為 2n,不足需要補齊。正文內容如果為對象,需要通過序列化將其放入到 ByteBuf 中
- 解碼器負責將 ByteBuf 中的信息取出,并放入 List 中,該 List 用于將信息傳遞給下一個 handler
編寫測試類
public class TestCodec {
static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel();
// 添加解碼器,避免粘包半包問題
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
channel.pipeline().addLast(new MessageCodec());
LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");
// 測試編碼與解碼
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, user, byteBuf);
channel.writeInbound(byteBuf);
}
}
復制代碼
- 測試類中用到了 LengthFieldBasedFrameDecoder,避免粘包半包問題
- 通過 MessageCodec 的 encode 方法將附加信息與正文寫入到 ByteBuf 中,通過 channel 執行入站操作。入站時會調用 decode 方法進行解碼
運行結果
@Sharable 注解
為了提高 handler 的復用率,可以將 handler 創建為 handler 對象,然后在不同的 channel 中使用該 handler 對象進行處理操作
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一個handler對象,提高復用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
復制代碼
但是并不是所有的 handler 都能通過這種方法來提高復用率的,例如
LengthFieldBasedFrameDecoder。如果多個 channel 中使用同一個 LengthFieldBasedFrameDecoder 對象,則可能發生如下問題
- channel1 中收到了一個半包,LengthFieldBasedFrameDecoder 發現不是一條完整的數據,則沒有繼續向下傳播
- 此時 channel2 中也收到了一個半包,因為兩個 channel 使用了同一個 LengthFieldBasedFrameDecoder,存入其中的數據剛好拼湊成了一個完整的數據包。LengthFieldBasedFrameDecoder 讓該數據包繼續向下傳播,最終引發錯誤
為了提高 handler 的復用率,同時又避免出現一些并發問題,Netty 中原生的 handler 中用 @Sharable 注解來標明,該 handler 能否在多個 channel 中共享。
只有帶有該注解,才能通過對象的方式被共享,否則無法被共享
自定義編解碼器能否使用 @Sharable 注解
這需要根據自定義的 handler 的處理邏輯進行分析
我們的 MessageCodec 本身接收的是
LengthFieldBasedFrameDecoder 處理之后的數據,那么數據肯定是完整的,按分析來說是可以添加 @Sharable 注解的
但是實際情況我們并不能添加該注解,會拋出異常信息 ChannelHandler
cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared
- 因為 MessageCodec 繼承自 ByteToMessageCodec,ByteToMessageCodec 類的注解如下
這就意味著 ByteToMessageCodec 不能被多個 channel 所共享的
- 原因:因為該類的目標是:將 ByteBuf 轉化為 Message,意味著傳進該 handler 的數據還未被處理過。所以傳過來的 ByteBuf 可能并不是完整的數據,如果共享則會出現問題
如果想要共享,需要怎么辦呢?
繼承 MessageToMessageDecoder 即可。 該類的目標是:將已經被處理的完整數據再次被處理。傳過來的 Message 如果是被處理過的完整數據,那么被共享也就不會出現問題了,也就可以使用 @Sharable 注解了。實現方式與 ByteToMessageCodec 類似
@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
...
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
...
}
}