你好,我是碼哥,一個擁抱硬核技術和對象,面向人民幣編程的男人,設置星標不迷路。
我在【redis 使用 List 實現消息隊列的利與弊】說過使用 List 實現消息隊列有很多局限性。
- 沒有 ACK 機制。
- 沒有類似 Kafka 的 ConsumerGroup 消費組概念。
- 消息堆積。
- List 是線性結構,查詢指定數據需要遍歷整個列表。
1、是什么
Stream 是 Redis 5.0 版本專門為消息隊列設計的數據類型,借鑒了 Kafka 的 Consume Group 設計思路,提供了消費組概念。
同時提供了消息的持久化和主從復制機制,客戶端可以訪問任何時刻的數據,并且能記住每一個客戶端的訪問位置,從而保證消息不丟失。
以下幾個是 Stream 類型的主要特性。
- 使用 Radix Tree 和 listpack 結構來存儲消息。
- 消息 ID 序列化生成。
- 借鑒 Kafka Consume Group 的概念,多個消費者劃分到不同的 Consume Group 中,消費同一個 Streams,同一個 Consume Group 的多個消費者可以一起并行但不重復消費,提升消費能力。
- 支持多播(多對多),阻塞和非阻塞讀取。
- ACK 確認機制,保證了消息至少被消費一次。
- 可設置消息保存上限閾值,我會把歷史消息丟棄,防止內存占用過大。
需要注意的是,Redis Stream 是一種超輕量級的 MQ,并沒有完全實現消息隊列的所有設計要點,所以它的使用場景需要考慮業務的數據量和對性能、可靠性的需求。
適合系統消息量不大,容忍數據丟失,使用 Redis Stream 作為消息隊列就能享受高性能快速讀寫消息的優勢。
2、修煉心法
每個 Stream 都有一個唯一的名稱,作為 Stream 在 Redis 的 key,在首次使用 xadd
指令添加消息的時候會自動創建。
可以看到 Stream 在一個 Redix Tree 樹上,樹上存儲的是消息 ID,每個消息 ID 對應的消息通過一個指針指向 listpack。
Stream 流就像是一個僅追加內容的消息鏈表,把消息一個個串起來,每個消息都有一個唯一的 ID 和消息內容,消息內容則由多個 field/value 鍵值對組成。底層使用 Radix Tree 和 listpack 數據結構存儲數據。
為了便于理解,我畫了一張圖,并對 Radix Tree 的存儲數據做了下變形,使用列表來體現 Stream 中消息的邏輯有序性。
這張圖涉及很多概念,但是你不要慌。我一步步拆開說,最后你再回頭看就懂了。
先帶你屢下全局思路。
- Consumer Group:消費組,每個消費組可以有一個或者多個消費者,消費者之間是競爭關系。不同消費組的消費者之間無任何關系。
- *pel,全稱是 Pending Entries List,記錄了當前被客戶端讀取但是還沒有 ack(Acknowledge character 確認字符)的消息。如果客戶端沒有 ack,這個變量的消息 ID 會越來越多。這是一個核心數據結構,用來確保客戶端至少消費消息一次。
Stream 結構
Streams 結構的源碼定義在 stream.h 源碼中的 stream 結構體中。
typedef struct stream {
rax *rax;
uint64_t length;
streamID last_id;
streamID first_id;
streamID max_deleted_entry_id;
uint64_t entries_added;
rax *cgroups;
} stream;
typedef struct streamID {
uint64_t ms;
uint64_t seq;
} streamID;
- *rax,是一個 rax 的指針,指向一個 Radix Tree,key 存儲消息 ID,value 實際上指向一個 listpack 數據結構,存儲了多條消息,每條消息的 ID 都大于等于 這個 key 的消息 ID。
- length,該 Stream 的消息條數。
- streamID結構體,消息 ID 抽象,一共占 128 位,內部維護了毫秒時間戳(字段 ms);一個毫秒內的自增序號(字段 seq),用于區分同一毫秒內插入多條消息。
- last_id,當前 Stream 最后一條消息的 ID。
- first_id,當前 Stream 第一條消息的 ID。
- max_deleted_entry_id,當前 Stream 被刪除的最大的消息 ID。
- entries_added,總共有多少條消息添加到 Stream 中,entries_added = 已刪除消息條數 + 未刪除消息條數。
- *cgroups,rax 指針,也指向一個 Radix Tree ,記錄當前 Stream 的所有 Consume Group,每個 Consume Group 的名稱都是唯一標識,作為 Radix Tree 的 key,Consumer Group 實例作為 value。
Consumer Group
Consumer Group 由 streamCG 結構體定義,每個 Stream 可以有多個 Consumer Group,一個消費組可以有多個消費者同時對組內消息進行消費。
/* Consumer group. */
typedef struct streamCG {
streamID last_id;
long long entries_read;
rax *pel;
rax *consumers;
} streamCG;
- last_id,表示該消費組的消費者已經讀取但還未 ACK 的最后一條消息 ID。
- *pel,是 pending entries list 簡寫,指向一個 Radix Tree 的指針,保存著 Consumer group 中所有消費者讀取但還未 ACK 確認的消息,就是這玩意實現了 ACK 機制。該樹的 key 是消息 ID,value 關聯一個 streamNACK 實例。
- *consumers, Radix Tree 指針,表示消費組中的所有消費者,key 是消費者名稱,value 指向一個 streamConsumer 實例。
streamNACK
streamCG -> *pel 對應的 value 是一個 streamNACK 實例,用于抽象消費者已經讀取,但是未 ACK 的消息 ID 相關信息。
/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNACK {
mstime_t delivery_time;
uint64_t delivery_count;
streamConsumer *consumer;
} streamNACK;
- delivery_time,該消息最后一次推送給 Consumer 的時間戳。
- delivery_count,消息被推送次數。
- *consumer,消息推送的 Consumer 客戶端。
streamConsumer
Consumer Group 中對 Consumer 的抽象。
/* A specific consumer in a consumer group. */
typedef struct streamConsumer {
mstime_t seen_time;
sds name;
rax *pel;
} streamConsumer;
- seen_time,消費者最近一次被激活的時間戳。
- name,消費者名稱。
- *pel, Radix Tree 指針,對于同一個消息而言,``streamCG -> pel與streamConsumer -> pel的streamNACK` 實例是同一個。
最后來一張圖,便于你理解。
肖材積:“Redis 你好,Stream 如何結合 Radix Tree 和 listpack 結構來存儲消息?為什么不使用散列表來存儲,消息 ID 作為散列表的 key,散列表的 value 存儲消息鍵值對內容。’”
在回答之前,先插入幾條消息到 Stream,讓你對 Stream 消息的存儲格式有個大體認知。
該命令的語法如下。
XADD key id field value [field value ...]
Stream 中的每個消息可以包含不同數量的多個鍵值對,寫入消息成功后,我會把消息的 ID 返回給客戶端。
執行如下指令把用戶購買書籍的下單消息存放到 hotlist:books隊列,消息內容主要由 payerID、amount 和 orderID。
> XADD hotlist:books * payerID 1 amount 69.00 orderID 9
1679218539571-0
> XADD hotlist:books * payerID 1 amount 36.00 orderID 15
1679218572182-0
> XADD hotlist:books * payerID 2 amount 99.00 orderID 88
1679218588426-0
> XADD hotlist:books * payerID 3 amount 68.00 orderID 80
1679218604492-0
hotlist:books 是 Stream 的名稱,后面的 “*” 表示讓 Redis 為插入的消息自動生成一個唯一 ID,你也可以自定義。
消息 ID 由兩部分組成。
- 當前毫秒內的時間戳。
- 順序編號。從 0 為起始值,用于區分同一時間內產生的多個命令。
肖材積:“如何理解 Stream 是一種只執行追加操作(Append only)的數據結構?”
通過將元素 ID 與時間進行關聯,并強制要求新元素的 ID 必須大于舊元素的 ID, Redis 從邏輯上將 Stream 變成了一種只執行追加操作(append only)的數據結構。
用戶可以確信,新的消息和事件只會出現在已有消息和事件之后,就像現實世界里新事件總是發生在已有事件之后一樣,一切都是有序進行的。
?肖材積:“插入的消息 ID 大部分相同,比如這四條消息的 ID 都是 1679218 前綴。另外,每條消息鍵值對的鍵通常都是一樣的,比如這四條消息的鍵都是 payerID、amount 和 orderID。使用散列表存儲的話會很多冗余數據,你這么摳門,所以不使用散列表對不對?”
沒毛病,小老弟很聰明。為了節省內存,我使用了 Radix Tree 和 listpack。Radix Tree 的 key 存儲消息 ID,value 使用 listpack 數據結構存儲多個消息, listapck 中的消息 ID 都大于等于 key 存儲的消息 ID。
我在前面已經講過 listpack,這是一個緊湊型列表,非常節省內存。而 Radix Tree 數據結構的最大特點是適合保存具有相同前綴的數據,從而達到節省內存。
到底 Radix Tree 是怎樣的數據結構,繼續往下看。
Radix Tree
Radix Tree,也被稱為 Radix Trie,或者 Compact Prefix Tree),用于高效地存儲和查找字符串集合。它將字符串按照前綴拆分成一個個字符,并將每個字符作為一個節點存儲在樹中。
當插入一個鍵值對時,Redis 會將鍵按照字符拆分成一個個字符,并根據字符在 Radix tree 中的位置找到合適的節點,如果該節點不存在,則創建新節點并添加到 Radix tree 中。
當所有字符都添加完畢后,將值對象指針保存到最后一個節點中。當查詢一個鍵時,Redis 按照字符順序遍歷 Radix tree,如果發現某個字符不存在于樹中,則鍵不存在;否則,如果最后一個節點表示一個完整的鍵,則返回對應的值對象。
如下圖展示一個簡單的前綴樹,將根節點到葉子節點的路徑對應字符拼接起來,就得到了兩個 key(“他說碉堡了”、“他說碉炸了”)。
你應該發現了,這兩個 key 擁有公共前綴(他說碉),前綴樹實現了共享使用,這樣就可以避免相同字符串重復存儲。如果采用散列表的保存方式,那個 key 的相同前綴就會被多次存儲,導致內存浪費。
Radix Tree 改進
每個節點只保存一個字符,一是會浪費內存空間,二是在進行查詢時,還需要逐一匹配每個節點表示的字符,對查詢性能也會造成影響。
所以,Redis 并沒有直接使用標準前綴樹,而是做了一次變種——Compact Prefix Tree(壓縮前綴樹)。通俗來說,當多個 key 具有相同的前綴時,那就將相同前綴的字符串合并在一個共享節點中,從而減少存儲空間。
如下幾個 key(test、toaster、toasting、slow、slowly)在 Radix Tree 上的布局。
由于 Compact Prefix Tree 可以共享相同前綴的節點,所以在存儲一組具有相同前綴的鍵時,Redis 的 Radix tree 比其他數據結構(如哈希表)具有更低的空間消耗和更快的查詢速度。
Radix Tree 節點的數據結構由 rax.h文件中的 raxNode 定義。
typedef struct raxNode {
uint32_t iskey:1;
uint32_t isnull:1;
uint32_t iscompr:1;
uint32_t size:29;
unsigned char data[];
} raxNode;
- iskey:從 Radix Tree 根節點到當前節點組成的字符串是否是一個完整的 key。是的話 iskey 的值為 1。
- isnull:當前節點是否為空節點,如果當前節點是空節點的話,就不需要為該節點分配指向 value 的指針內存。
- iscompr,是否為壓縮節點。
- size,當前節點的大小,具體指會根據節點類型而改變。如果是壓縮節點,該值表示壓縮數據的長度;如果是非壓縮節點,該值表示節點的子節點個數。
- data[],實際存儲的數據,根據節點類型不同而有所不同。
- 壓縮節點,data 數據包括子節點對應的字符、指向子節點的指針,節點為最終 key 對應的 value 指針。
- 壓縮節點,data 數據包含子節點對應的合并字符串、指向子節點的指針,以及節點為最終 key 的 value 指針。
- value 指針指向一個 listpack 實例,里面保存了消息實際內容。
Radix Tree 最大的特點就是適合保存具有相同前綴的數據,實現節省內存的目標,以及支持范圍查找。而這個就是 Stream 采用 Radix Tree 作為底層數據結構的原因。