前提
最近學習Netty的時候想做一個基于redis服務協議的編碼解碼模塊,過程中順便閱讀了Redis服務序列化協議RESP,結合自己的理解對文檔進行了翻譯并且簡單實現了RESP基于JAVA語言的解析。編寫本文的使用使用的JDK版本為[8+]。
RESP簡介
Redis客戶端與Redis服務端基于一個稱作RESP的協議進行通信,RESP全稱為Redis Serialization Protocol,也就是Redis序列化協議。雖然RESP為Redis設計,但是它也可以應用在其他客戶端-服務端(Client-Server)的軟件項目中。RESP在設計的時候折中考慮了如下幾點:
- 易于實現。
- 快速解析。
- 可讀性高。
RESP可以序列化不同的數據類型,如整型、字符串、數組還有一種特殊的Error類型。需要執行的Redis命令會封裝為類似于字符串數組的請求然后通過Redis客戶端發送到Redis服務端。Redis服務端會基于特定的命令類型選擇對應的一種數據類型進行回復(這一句是意譯,原文是:Redis replies with a command-specific data type)。
RESP是二進制安全的(binary-safe),并且在RESP下不需要處理從一個進程傳輸到另一個進程的批量數據,因為它使用了前綴長度(prefixed-length,后面會分析,就是在每個數據塊的前綴已經定義好數據塊的個數,類似于Netty里面的定長編碼解碼)來傳輸批量數據。
注意:此處概述的協議僅僅使用在客戶端-服務端通信,Redis Cluster使用不同的二進制協議在多個節點之間交換消息(也就是Redis集群中的節點之間并不使用RESP通信)。
網絡層
Redis客戶端通過創建一個在6379端口的TCP連接,連接到Redis服務端。
雖然RESP在底層通信協議技術上是非TCP特定的,但在Redis的上下文中,RESP僅用于TCP連接(或類似的面向流的連接,如Unix套接字)。
請求-響應模型
Redis服務端接收由不同參數組成的命令,接收到命令并將其處理之后會把回復發送回Redis客戶端。這是最簡單的模型,但是有兩種例外的情況:
- Redis支持管道(Pipelining,流水線,多數情況下習慣稱為管道)操作。使用管道的情況下,Redis客戶端可以一次發送多個命令,然后等待一次性的回復(文中的回復是replies,理解為Redis服務端會一次性返回一個批量回復結果)。
- 當Redis客戶端訂閱Pub/Sub信道時,該協議會更改語義并成為推送協議(push protocol),也就是說,客戶端不再需要發送命令,因為Redis服務端將自動向客戶端(訂閱了改信道的客戶端)發送新消息(這里的意思是:在訂閱/發布模式下,消息是由Redis服務端主動推送給訂閱了特定信道的Redis客戶端)。
除了上述兩個特例之外,Redis協議是一種簡單的請求-響應協議。
RESP支持的數據類型
RESP在Redis 1.2中引入,在Redis 2.0,RESP正式成為與Redis服務端通信的標準方案。也就是如果需要編寫Redis客戶端,你就必須在客戶端中實現此協議。
RESP本質上是一種序列化協議,它支持的數據類型如下:單行字符串、錯誤消息、整型數字、定長字符串和RESP數組。
RESP在Redis中用作請求-響應協議的方式如下:
- Redis客戶端將命令封裝為RESP的數組類型(數組元素都是定長字符串類型,注意這一點,很重要)發送到Redis服務器。
- Redis服務端根據命令實現選擇對應的RESP數據類型之一進行回復。
在RESP中,數據類型取決于數據報的第一個字節:
- 單行字符串的第一個字節為+。
- 錯誤消息的第一個字節為-。
- 整型數字的第一個字節為:。
- 定長字符串的第一個字節為$。
- RESP數組的第一個字節為*。
另外,在RESP中可以使用定長字符串或者數組的特殊變體來表示Null值,后面會提及。在RESP中,協議的不同部分始終以rn(CRLF)終止。
目前RESP中5種數據類型的小結如下:
數據類型 本文翻譯名稱 基本特征 例子 Simple String 單行字符串 第一個字節是+,最后兩個字節是rn,其他字節是字符串內容 +OKrn Error 錯誤消息 第一個字節是-,最后兩個字節是rn,其他字節是異常消息的文本內容 -ERRrn Integer 整型數字 第一個字節是:,最后兩個字節是rn,其他字節是數字的文本內容 :100rn Bulk String 定長字符串 第一個字節是$,緊接著的字節是內容字符串長度rn,最后兩個字節是rn,其他字節是字符串內容 $4rndogern Array RESP數組 第一個字節是*,緊接著的字節是元素個數rn,最后兩個字節是rn,其他字節是各個元素的內容,每個元素可以是任意一種數據類型 *2rn:100rn$4rndogern 下面的小節是對每種數據類型的更細致的分析。
RESP簡單字符串-Simple String
簡單字符串的編碼方式如下:
- (1)第一個字節為+。
- (2)緊接著的是一個不能包含CR或者LF字符的字符串。
- (3)以CRLF終止。
簡單字符串能夠保證在最小開銷的前提下傳輸非二進制安全的字符串。例如很多Redis命令執行成功后服務端需要回復OK字符串,此時通過簡單字符串編碼為5字節的數據報如下:
+OKrn 復制代碼
如果需要發送二進制安全的字符串,那么需要使用定長字符串。
當Redis服務端用簡單字符串響應時,Redis客戶端庫應該向調用者返回一個字符串,該響應到調用者的字符串由+之后直到字符串內容末尾的字符組成(其實就是上面提到的第(2)部分的內容),不包括最后的CRLF字節。
RESP錯誤消息-Error
錯誤消息類型是RESP特定的數據類型。實際上,錯誤消息類型和簡單字符串類型基本一致,只是其第一個字節為-。錯誤消息類型跟簡單字符串類型的最大區別是:錯誤消息作為Redis服務端響應的時候,對于客戶端而言應該感知為異常,而錯誤消息中的字符串內容應該感知為Redis服務端返回的錯誤信息。錯誤消息的編碼方式如下:
- (1)第一個字節為-。
- (2)緊接著的是一個不能包含CR或者LF字符的字符串。
- (3)以CRLF終止。
一個簡單的例子如下:
-Error messagern 復制代碼
Redis服務端只有在真正發生錯誤或者感知錯誤的時候才會回復錯誤消息,例如嘗試對錯誤的數據類型執行操作或者命令不存在等等。Redis客戶端接收到錯誤消息的時候,應該觸發異常(一般情況就是直接拋出異常,可以根據錯誤消息的內容進行異常分類)。下面是錯誤消息響應的一些例子:
-ERR unknown command 'foobar' -WRONGTYPE Operation against a key holding the wrong kind of value 復制代碼
-之后的第一個單詞到第一個空格或換行符之間的內容,代表返回的錯誤類型。這只是Redis使用的約定,不是RESP錯誤消息格式的一部分。
例如,ERR是通用錯誤,WRONGTYPE則是更具體的錯誤,表示客戶端試圖針對錯誤的數據類型執行操作。這種定義方式稱為錯誤前綴,是一種使客戶端能夠理解服務器返回的錯誤類型的方法,而不必依賴于所給出的確切消息定義,該消息可能會隨時間而變化。
客戶端實現可以針對不同的錯誤類型返回不同種類的異常,或者可以通過將錯誤類型的名稱作為字符串直接提供給調用方來提供捕獲錯誤的通用方法。
但是,不應該將錯誤消息分類處理的功能視為至關重要的功能,因為它作用并不巨大,并且有些的客戶端實現可能會簡單地返回特定值去屏蔽錯誤消息作為通用的異常處理,例如直接返回false。
RESP整型數字-Integer
整型數字的編碼方式如下:
- (1)第一個字節為:。
- (2)緊接著的是一個不能包含CR或者LF字符的字符串,也就是數字要先轉換為字符序列,最終要輸出為字節。
- (3)以CRLF終止。
例如:
:0rn :1000rn 復制代碼
許多Redis命令返回整型數字,像INCR,LLEN和LASTSAVE命令等等。
返回的整型數字沒有特殊的含義,像INCR返回的是增量的總量,而LASTSAVE是UNIX時間戳。但是Redis服務端保證返回的整型數字在帶符號的64位整數范圍內。
有些情況下,返回的整型數字會指代true或者false。如EXISTS或者SISMEMBER命令執行返回1代表true,0代表false。
有些情況下,返回的整型數字會指代命令是否真正產生了效果。如SADD,SREM和SETNX命令執行返回1代表命令執行生效,0代表命令執行不生效(等價于命令沒有執行)。
下面的一組命令執行后都是返回整型數字:SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD。
RESP定長字符串-Bulk String
定長字符串用于表示一個最大長度為512MB的二進制安全的字符串(Bulk,本身有體積大的含義)。定長字符串的編碼方式如下:
- (1)第一個字節為$。
- (2)緊接著的是組成字符串的字節數長度(稱為prefixed length,也就是前綴長度),前綴長度分塊以CRLF終止。
- (3)然后是一個不能包含CR或者LF字符的字符串,也就是數字要先轉換為字符序列,最終要輸出為字節。
- (4)以CRLF終止。
舉個例子,doge使用定長字符串編碼如下:
第一個字節 前綴長度 CRLF 字符串內容 CRLF 定長字符串 $ 4 rn doge rn ===> $4rndogern foobar使用定長字符串編碼如下:
第一個字節 前綴長度 CRLF 字符串內容 CRLF 定長字符串 $ 6 rn foobar rn ===> $6rnfoobarrn 表示空字符串(Empty String,對應Java中的"") 的時候,使用定長字符串編碼如下:
第一個字節 前綴長度 CRLF CRLF 定長字符串 $ 0 rn rn ===> $0rnrn 定長字符串也可以使用特殊的格式來表示Null值,指代值不存在。在這種特殊格式中,前綴長度為-1,并且沒有數據,因此使用定長字符串對Null值進行編碼如下:
第一個字節 前綴長度 CRLF 定長字符串 $ -1 rn ===> $-1rn 當Redis服務端返回定長字符串編碼的Null值的時候,客戶端不應該返回空字符串,而應該返回對應編程語言中的Null對象。例如Ruby中對應nil,C語言中對應NULL,Java中對應null,以此類推。
RESP數組-Array
Redis客戶端使用RESP數組發送命令到Redis服務端。與此相似,某些Redis命令執行完畢后服務端需要使用RESP數組類型將元素集合返回給客戶端,如返回一個元素列表的LRANGE命令。RESP數組和我們認知中的數組并不完全一致,它的編碼格式如下:
- (1)第一個字節為*。
- (2)緊接著的是組成RESP數組的元素個數(十進制數,但是最終需要轉換為字節序列,如10需要轉換為1和0兩個相鄰的字節),元素個數分塊以CRLF終止。
- (3)RESP數組的每個元素內容,每個元素可以是任意的RESP數據類型。
一個空的RESP數組的編碼如下:
*0rn 復制代碼
一個包含2個定長字符串元素內容分別為foo和bar的RESP數組的編碼如下:
*2rn$3rnfoorn$3rnbarrn 復制代碼
通用格式就是:*<count>CRLF作為RESP數組的前綴部分,而組成RESP數組的其他數據類型的元素只是一個接一個地串聯在一起。例如一個包含3個整數類型元素的RESP數組的編碼如下:
*3rn:1rn:2rn:3rn 復制代碼
RESP數組的元素不一定是同一種數據類型,可以包含混合類型的元素。例如下面是一個包含4個整數類型元素和1個定長字符串類型元素(一共有5個元素)的RESP數組的編碼(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
# 元素個數 *5rn # 第1個整型類型的元素 :1rn # 第2個整型類型的元素 :2rn # 第3個整型類型的元素 :3rn # 第4個整型類型的元素 :4rn # 定長字符串類型的元素 $6rn foobarrn 復制代碼
Redis服務端響應報的首行*5rn定義了后面會緊跟著5個回復數據,然后每個回復數據分別作元素項,構成了用于傳輸的多元素定長回復(Multi Bulk Reply,感覺比較難翻譯,這里的大概意思就是每個回復行都是整個回復報中的一個項)。
這里可以類比為Java中的ArrayList(泛型擦除),有點類似于下面的偽代碼:
List encode = new ArrayList(); // 添加元素個數 encode.add(elementCount); encode.add(CRLF); // 添加第1個整型類型的元素 - 1 encode.add(':'); encode.add(1); encode.add(CRLF); // 添加第2個整型類型的元素 - 2 encode.add(':'); encode.add(2); encode.add(CRLF); // 添加第3個整型類型的元素 - 3 encode.add(':'); encode.add(3); encode.add(CRLF); // 添加第4個整型類型的元素 - 4 encode.add(':'); encode.add(4); encode.add(CRLF); // 添加定長字符串類型的元素 encode.add('$'); // 前綴長度 encode.add(6); // 字符串內容 encode.add("foobar"); encode.add(CRLF); 復制代碼
RESP數組中也存在Null值的概念,下面稱為RESP Null Array。處于歷史原因,RESP數組中采用了另一種特殊的編碼格式定義Null值,區別于定長字符串中的Null值字符串。例如,BLPOP命令執行超時的時候,就會返回一個RESP Null Array類型的響應。RESP Null Array的編碼如下:
*-1rn 復制代碼
當Redis服務端的回復是RESP Null Array類型的時候,客戶端應該返回一個Null對象,而不是一個空數組或者空列表。這一點比較重要,它是區分回復是空數組(也就是命令正確執行完畢,返回結果正常)或者其他原因(如BLPOP命令的超時等)的關鍵。
RESP數組的元素也可以是RESP數組,下面是一個包含2個RESP數組類型的元素的RESP數組,編碼如下(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
# 元素個數 *2rn # 第1個RESP數組元素 *3rn :1rn :2rn :3rn # 第2個RESP數組元素 *2rn +Foorn -Barrn 復制代碼
上面的RESP數組的包含2個RESP數組類型的元素,第1個RESP數組元素包含3個整型類型的元素,而第2個RESP數組元素包含1個簡單字符串類型的元素和1個錯誤消息類型的元素。
RESP數組中的Null元素
RESP數組中的單個元素也有Null值的概念,下面稱為Null元素。Redis服務端回復如果是RESP數組類型,并且RESP數組中存在Null元素,那么意味著元素丟失,絕對不能用空字符串替代。缺少指定鍵的前提下,當與GET模式選項一起使用時,SORT命令可能會發生這種情況。
下面是一個包含Null元素的RESP數組的例子(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
*3rn $3rn foorn $-1rn $3rn barrn 復制代碼
RESP數組中的第2個元素是Null元素,客戶端API最終返回的內容應該是:
# Ruby ["foo",nil,"bar"] # Java ["foo",null,"bar"] 復制代碼
RESP其他相關內容
主要包括:
- 將命令發送到Redis服務端的示例。
- 批量命令與管道。
- 內聯命令(Inline Commands)。
其實文檔中還有一節使用C語言編寫高性能RESP解析器,這里不做翻譯,因為掌握RESP的相關內容后,可以基于任何語言編寫解析器。
將命令發送到Redis服務端
如果已經相對熟悉RESP中的序列化格式,那么編寫Redis客戶端類庫就會變得很容易。我們可以進一步指定客戶端和服務器之間的交互方式:
- Redis客戶端向Redis服務端發送僅僅包含定長字符串類型元素的RESP數組。
- Redis服務端可以采用任意一種RESP數據類型向Redis客戶端進行回復,具體的數據類型一般取決于命令類型。
下面是典型的交互例子:Redis客戶端發送命令LLEN mylist以獲得KEY為mylist的長度,Redis服務端將以整數類型進行回復,如以下示例所示(C是客戶端,S服務器),偽代碼如下:
C: *2rn C: $4rn C: LLENrn C: $6rn C: mylistrn S: :48293rn 復制代碼
為了簡單起見,我們使用換行符來分隔協議的不同部分(這里指上面的代碼分行展示),但是實際交互的時候Redis客戶端在發送*2rn$4rnLLENrn$6rnmylistrn的時候是整體發送的。
批量命令與管道
Redis客戶端可以使用相同的連接發送批量命令。Redis支持管道特性,因此Redis客戶端可以通過一次寫操作發送多個命令,而無需在發送下一個命令之前讀取Redis服務端對上一個命令的回復。批量發送命令之后,所有的回復可以在最后得到(合并為一個回復)。更多相關信息可以查看Using pipelining to speedup Redis queries。
內聯命令
有些場景下,我們可能只有telnet命令可以使用,在這種條件下,我們需要發送命令到Redis服務端。盡管Redis協議易于實現,但在交互式會話中并不理想,并且redis-cli有些情況下不一定可用。處于這類原因,Redis設計了一種專為人類設計的命令格式,稱為內聯命令(Inline Command格式。
以下是服務器/客戶端使用內聯命令進行聊天的示例(S代表服務端,C代表客戶端):
C: PING S: +PONG 復制代碼
以下是使用內聯命令返回整數的另一個示例:
C: EXISTS somekey S: :0 復制代碼
基本上只需在telnet會話中編寫以空格分隔的參數。由于除了統一的請求協議之外沒有命令會以*開頭,Redis能夠檢測到這種情況并解析輸入的命令。
基于RESP編寫高性能解析器
因為JDK原生提供的字節緩沖區java.nio.ByteBuffer存在不能自動擴容、需要切換讀寫模式等等問題,這里直接引入Netty并且使用Netty提供的ByteBuf進行RESP數據類型解析。編寫本文的時候(2019-10-09)Netty的最新版本為4.1.42.Final。引入依賴:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> <version>4.1.42.Final</version> </dependency> 復制代碼
定義解碼器接口:
public interface RespDecoder<V>{ V decode(ByteBuf buffer); } 復制代碼
定義常量:
public class RespConstants { public static final Charset ASCII = StandardCharsets.US_ASCII; public static final Charset UTF_8 = StandardCharsets.UTF_8; public static final byte DOLLAR_BYTE = '$'; public static final byte ASTERISK_BYTE = '*'; public static final byte PLUS_BYTE = '+'; public static final byte MINUS_BYTE = '-'; public static final byte COLON_BYTE = ':'; public static final String EMPTY_STRING = ""; public static final Long ZERO = 0L; public static final Long NEGATIVE_ONE = -1L; public static final byte CR = (byte) 'r'; public static final byte LF = (byte) 'n'; public static final byte[] CRLF = "rn".getBytes(ASCII); public enum ReplyType { SIMPLE_STRING, ERROR, INTEGER, BULK_STRING, RESP_ARRAY } } 復制代碼
下面的章節中解析模塊的實現已經忽略第一個字節的解析,因為第一個字節是決定具體的數據類型。
解析簡單字符串
簡單字符串類型就是單行字符串,它的解析結果對應的就是Java中的String類型。解碼器實現如下:
// 解析單行字符串 public class LineStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { return CodecUtils.X.readLine(buffer); } } public enum CodecUtils { X; public int findLineEndIndex(ByteBuf buffer) { int index = buffer.forEachByte(ByteProcessor.FIND_LF); return (index > 0 && buffer.getByte(index - 1) == 'r') ? index : -1; } public String readLine(ByteBuf buffer) { int lineEndIndex = findLineEndIndex(buffer); if (lineEndIndex > -1) { int lineStartIndex = buffer.readerIndex(); // 計算字節長度 int size = lineEndIndex - lineStartIndex - 1; byte[] bytes = new byte[size]; buffer.readBytes(bytes); // 重置讀游標為rn之后的第一個字節 buffer.readerIndex(lineEndIndex + 1); buffer.markReaderIndex(); return new String(bytes, RespConstants.UTF_8); } return null; } } public class RespSimpleStringDecoder extends LineStringDecoder { } 復制代碼
這里抽取出一個類LineStringDecoder用于解析單行字符串,這樣在解析錯誤消息的時候可以做一次繼承即可。測試一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // +OKrn buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:OK 復制代碼
解析錯誤消息
錯誤消息的本質也是單行字符串,所以其解碼的實現可以和簡單字符串的解碼實現一致。錯誤消息數據類型的解碼器如下:
public class RespErrorDecoder extends LineStringDecoder { } 復制代碼
測試一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // -ERR unknown command 'foobar'rn buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:ERR unknown command 'foobar' 復制代碼
解析整型數字
整型數字類型,本質就是需要從字節序列中還原出帶符號的64bit的長整型,因為是帶符號的,類型標識位:后的第一個字節需要判斷是否負數字符-,因為是從左向右解析,然后每解析出一個新的位,當前的數字值要乘10。其解碼器的實現如下:
public class RespIntegerDecoder implements RespDecoder<Long> { @Override public Long decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); // 沒有行尾,異常 if (-1 == lineEndIndex) { return null; } long result = 0L; int lineStartIndex = buffer.readerIndex(); boolean negative = false; byte firstByte = buffer.getByte(lineStartIndex); // 負數 if (RespConstants.MINUS_BYTE == firstByte) { negative = true; } else { int digit = firstByte - '0'; result = result * 10 + digit; } for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) { byte value = buffer.getByte(i); int digit = value - '0'; result = result * 10 + digit; } if (negative) { result = -result; } // 重置讀游標為rn之后的第一個字節 buffer.readerIndex(lineEndIndex + 1); return result; } } 復制代碼
整型數字類型的解析相對復雜,一定要注意負數判斷。測試一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // :-1000rn buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); Long value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:-1000 復制代碼
解析定長字符串
定長字符串類型解析的關鍵是先讀取類型標識符$后的第一個字節序列分塊解析成64bit帶符號的整數,用來確定后面需要解析的字符串內容的字節長度,然后再按照該長度讀取后面的字節。其解碼器實現如下:
public class RespBulkStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 使用RespIntegerDecoder讀取長度 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Bulk Null String if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Bulk Empty String if (RespConstants.ZERO.equals(length)) { return RespConstants.EMPTY_STRING; } // 真實字節內容的長度 int readLength = (int) length.longValue(); if (buffer.readableBytes() > readLength) { byte[] bytes = new byte[readLength]; buffer.readBytes(bytes); // 重置讀游標為rn之后的第一個字節 buffer.readerIndex(buffer.readerIndex() + 2); return new String(bytes, RespConstants.UTF_8); } return null; } } 復制代碼
測試一下:
public static void main(String[] args) throws Exception{ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // $6rnthrowablern buffer = ByteBufAllocator.DEFAULT.buffer(); buffer.writeBytes("$9".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:throwable 復制代碼
解析RESP數組
RESP數組類型解析的關鍵:
- 先讀取類型標識符*后的第一個字節序列分塊解析成64bit帶符號的整數,確定數組中的元素個數。
- 遞歸解析每個元素。
參考過不少Redis協議解析框架,不少是用棧或者狀態機實現,這里先簡單點用遞歸實現,解碼器代碼如下:
public class RespArrayDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 解析元素個數 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Null Array if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Array Empty List if (RespConstants.ZERO.equals(length)) { return Lists.newArrayList(); } List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue()); // 遞歸 for (int i = 0; i < length; i++) { result.add(DefaultRespCodec.X.decode(buffer)); } return result; } } 復制代碼
測試一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //*2rn$3rnfoorn$3rnbarrn buffer = ByteBufAllocator.DEFAULT.buffer(); buffer.writeBytes("*2".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("$3".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("foo".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("$3".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("bar".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); List value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:[foo, bar] 復制代碼
小結
對RESP的內容和其編碼解碼的過程有相對深刻的認識后,就可以基于Netty編寫Redis服務的編碼解碼模塊,作為Netty入門的十分有意義的例子。本文的最后一節只演示了RESP的解碼部分,編碼模塊和更多細節會在另一篇用Netty實現Redis客戶端的文章中展示。
參考資料:
- Redis Protocol specification
鏈接
希望你能讀到這里,然后發現我:
- Github Page:www.throwable.club/2019/10/09/…
- Coding Page:throwable.coding.me/2019/10/09/…
附錄
本文涉及的所有代碼:
public class RespConstants { public static final Charset ASCII = StandardCharsets.US_ASCII; public static final Charset UTF_8 = StandardCharsets.UTF_8; public static final byte DOLLAR_BYTE = '$'; public static final byte ASTERISK_BYTE = '*'; public static final byte PLUS_BYTE = '+'; public static final byte MINUS_BYTE = '-'; public static final byte COLON_BYTE = ':'; public static final String EMPTY_STRING = ""; public static final Long ZERO = 0L; public static final Long NEGATIVE_ONE = -1L; public static final byte CR = (byte) 'r'; public static final byte LF = (byte) 'n'; public static final byte[] CRLF = "rn".getBytes(ASCII); public enum ReplyType { SIMPLE_STRING, ERROR, INTEGER, BULK_STRING, RESP_ARRAY } } public enum CodecUtils { X; public int findLineEndIndex(ByteBuf buffer) { int index = buffer.forEachByte(ByteProcessor.FIND_LF); return (index > 0 && buffer.getByte(index - 1) == 'r') ? index : -1; } public String readLine(ByteBuf buffer) { int lineEndIndex = findLineEndIndex(buffer); if (lineEndIndex > -1) { int lineStartIndex = buffer.readerIndex(); // 計算字節長度 int size = lineEndIndex - lineStartIndex - 1; byte[] bytes = new byte[size]; buffer.readBytes(bytes); // 重置讀游標為rn之后的第一個字節 buffer.readerIndex(lineEndIndex + 1); buffer.markReaderIndex(); return new String(bytes, RespConstants.UTF_8); } return null; } } public interface RespCodec { RespCodec X = DefaultRespCodec.X; <IN, OUT> OUT decode(ByteBuf buffer); <IN, OUT> ByteBuf encode(IN in); } public enum DefaultRespCodec implements RespCodec { X; static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap(); private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder(); static { DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder()); DECODERS.put(ReplyType.ERROR, new RespErrorDecoder()); DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder()); DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder()); DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder()); } @SuppressWarnings("unchecked") @Override public <IN, OUT> OUT decode(ByteBuf buffer) { return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer); } private ReplyType determineReplyType(ByteBuf buffer) { byte firstByte = buffer.readByte(); ReplyType replyType; switch (firstByte) { case RespConstants.PLUS_BYTE: replyType = ReplyType.SIMPLE_STRING; break; case RespConstants.MINUS_BYTE: replyType = ReplyType.ERROR; break; case RespConstants.COLON_BYTE: replyType = ReplyType.INTEGER; break; case RespConstants.DOLLAR_BYTE: replyType = ReplyType.BULK_STRING; break; case RespConstants.ASTERISK_BYTE: replyType = ReplyType.RESP_ARRAY; break; default: { throw new IllegalArgumentException("first byte:" + firstByte); } } return replyType; } @Override public <IN, OUT> ByteBuf encode(IN in) { // TODO throw new UnsupportedOperationException("encode"); } } public interface RespDecoder<V> { V decode(ByteBuf buffer); } public class DefaultRespDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { throw new IllegalStateException("decoder"); } } public class LineStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { return CodecUtils.X.readLine(buffer); } } public class RespSimpleStringDecoder extends LineStringDecoder { } public class RespErrorDecoder extends LineStringDecoder { } public class RespIntegerDecoder implements RespDecoder<Long> { @Override public Long decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); // 沒有行尾,異常 if (-1 == lineEndIndex) { return null; } long result = 0L; int lineStartIndex = buffer.readerIndex(); boolean negative = false; byte firstByte = buffer.getByte(lineStartIndex); // 負數 if (RespConstants.MINUS_BYTE == firstByte) { negative = true; } else { int digit = firstByte - '0'; result = result * 10 + digit; } for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) { byte value = buffer.getByte(i); int digit = value - '0'; result = result * 10 + digit; } if (negative) { result = -result; } // 重置讀游標為rn之后的第一個字節 buffer.readerIndex(lineEndIndex + 1); return result; } } public class RespBulkStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Bulk Null String if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Bulk Empty String if (RespConstants.ZERO.equals(length)) { return RespConstants.EMPTY_STRING; } // 真實字節內容的長度 int readLength = (int) length.longValue(); if (buffer.readableBytes() > readLength) { byte[] bytes = new byte[readLength]; buffer.readBytes(bytes); // 重置讀游標為rn之后的第一個字節 buffer.readerIndex(buffer.readerIndex() + 2); return new String(bytes, RespConstants.UTF_8); } return null; } } public class RespArrayDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 解析元素個數 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Null Array if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Array Empty List if (RespConstants.ZERO.equals(length)) { return Lists.newArrayList(); } List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue()); // 遞歸 for (int i = 0; i < length; i++) { result.add(DefaultRespCodec.X.decode(buffer)); } return result; } }
作者:Throwable
鏈接:https://juejin.im/post/5d9dec2b6fb9a04e0a37edd5