日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

隊列與Stream


 

redis stream結構如上圖所示

消息鏈表,每個消息都有一個唯一的 ID 和對應的內容。消息是持久化的,redis 重啟后,內容還在。

Stream唯一名稱,它就是 Redis 的 key,在我們首次使用xadd指令追加消息時自動創建。

消費組,一個stream支持多個 last_delivered_id,表示當前消費組已經消費到哪條消息了。

每個消費者組都有一個 Stream 內唯一的名稱,消費組不會自動創建,它需要單獨的指令xgroup create進行創建,需要指定從 Stream 的某個消息 ID 開始消費,這個 ID 用來初始化last_delivered_id變量。

同一個消費組 (Consumer Group) 可以掛接多個消費者 (Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者有一個組內唯一名稱。

pending_ids,它記錄了當前消費者已經被客戶端讀取,但是還沒有 ack的消息。如果客戶端沒有 ack,這個變量里面的消息 ID 會越來越多,一旦某個消息被 ack,它就開始減少。這個 pending_ids 變量在 Redis 官方被稱之為PEL,也就是Pending Entries List,這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。

消息ID 的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當前的消息在毫米時間戳1527846880572時產生,并且是該毫秒內產生的第 5 條消息。

消息內容就是鍵值對,形如 hash 結構的鍵值對,這沒什么特別之處。

常用命令

版本:redis-6.2.8

生產端 xadd 追加消息 xdel 刪除消息,這里的刪除僅僅是設置了標志位,不會實際刪除消息。 XDEL streamtest 1672574363910-0 xrange 獲取消息列表,會自動過濾已經刪除的消息 xlen 消息長度 del 刪除 Stream


 

streamtest 表示當前這個隊列的名字,也就是我們一般意義上Redis中的key,

* 號表示服務器自動生成 ID,后面順序跟著,是我們存入當前streamtest 這個隊列的消息,采用的也是 key/value的存儲形式

返回值1672574363910-0 則是生成的消息 ID,由兩部分組成:時間戳-序號。時間戳時毫秒級單位,是生成消息的Redis服務器時間,它是個64位整型。序號是在這個毫秒時間點內的消息序號。它也是個64位整型。建議使用Redis的方案生成消息ID,因為這種時間戳+序號的單調遞增的ID方案,幾乎可以滿足全部的需求,但ID是支持自定義的。

為了保證消息是有序的,因此Redis生成的ID是單調遞增有序的。由于ID中包含時間戳部分,為了避免服務器時間錯誤而帶來的問題(例如服務器時間延后了),Redis的每個Stream類型數據都維護一個latest_generated_id屬性,用于記錄最后一個消息的ID。若發現當前時間戳退后(小于latest_generated_id所記錄的),則采用時間戳不變而序號遞增的方案來作為新消息ID(這也是序號為什么使用int64的原因,保證有足夠多的的序號),從而保證ID的單調遞增性質。

消費端 單消費者

Redis 設計了一個單獨的消費指令xread,可以將 Stream 當成普通的消息隊列 (list) 來使用。使用 xread 時,我們可以完全忽略消費組 (Consumer Group) 的存在,就好比 Stream 就是一個普通的列表 (list)。

XREAD count 1 streams streamtest 0-0 count 1 //讀取1條消息 streams 關鍵字 0-0 從頭開始 xread count 2 streams streamtest 1672574316404-0 //消費1672574316404-0(不包括)后面的兩條消息 XREAD count 1 streams streamtest $ //默認返回nil,從尾部讀取最新的一條消息 XREAD block 0 count 1 streams streamtest $ //block 阻塞讀取消息,直到有消息寫入


 

一般來說客戶端如果想要使用 xread 進行順序消費,一定要記住當前消費到哪里了,也就是返回的消息 ID。下次繼續調用 xread 時,將上次返回的最后一個消息 ID 作為參數傳遞進去,就可以繼續消費后續的消息。

消費者組

創建消費者組

XGROUP create streamtest cgroup1 0-0 //從頭開始消費 XGROUP create streamtest cgroup2 $ //從尾部開始消費,只接收新消息,其他消息忽略

XINFO stream streamtest 查看消息隊列信息


 

XINFO groups streamtest 查看消息者組情況


 

消費消息

有了消費組,自然還需要消費者,Stream 提供了 xreadgroup 指令可以進行消費組的組內消費,需要提供消費組名稱、消費者名稱和起始消息 ID。 它同 xread 一樣,也可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的PEL(正在處理的消息) 結構里,客戶端處理完畢后使用 xack 指令通知服務器,本條消息已經處理完畢,該消息 ID 就會從 PEL 中移除。

XREADGROUP group cgroup1 c1 count 1 streams streamtest > //cgroup1 指定消費者組 //c1 指定消費者 //count 1 消費數量 // > 從當前消費者組的last_delivered_id(不包括)開始讀 //阻塞讀取,直到有消息寫入,并返回阻塞時間 XREADGROUP group cgroup1 c1 block 0 count 1 streams streamtest > XINFO groups streamtest //消費者組狀態 XINFO consumers streamtest cgroup1 //消費者組cgroup1內的消費者狀態 XACK streamtest cgroup1 1672624113938-0 //確認消息 XPENDING streamtest cgroup1 //返回cgroup1內消費者未處理完的消息

消費者組狀態


 

更多的Redis的Stream命令請大家參考Redis官方文檔:

Redis隊列幾種實現的總結

基于List的 LPUSH+BRPOP 的實現

足夠簡單,消費消息延遲幾乎為零,但是需要處理空閑連接的問題。

如果線程一直阻塞在那里,Redis客戶端的連接就成了閑置連接,閑置過久,服務器一般會主動斷開連接,減少閑置資源占用,這個時候blpop和brpop或拋出異常,所以在編寫客戶端消費者的時候要小心,如果捕獲到異常需要重試。

其他缺點包括:

做消費者確認ACK麻煩,不能保證消費者消費消息后是否成功處理的問題(宕機或處理異常等),通常需要維護一個Pending列表,保證消息處理確認;不能做廣播模式,如pub/sub,消息發布/訂閱模型;不能重復消費,一旦消費就會被刪除;不支持分組消費。

基于Sorted-Set的實現

多用來實現延遲隊列,當然也可以實現有序的普通的消息隊列,但是消費者無法阻塞的獲取消息,只能輪詢,不允許重復消息。

PUB/SUB,訂閱/發布模式

優點:

典型的廣播模式,一個消息可以發布到多個消費者;多信道訂閱,消費者可以同時訂閱多個信道,從而接收多類消息;消息即時發送,消息不用等待消費者讀取,消費者會自動接收到信道發布的消息。

缺點:

消息一旦發布,不能接收。換句話就是發布時若客戶端不在線,則消息丟失,不能尋回;不能保證每個消費者接收的時間是一致的;若消費者客戶端出現消息積壓,到一定程度,會被強制斷開,導致消息意外丟失。通常發生在消息的生產遠大于消費速度時;可見,Pub/Sub 模式不適合做消息存儲,消息積壓類的業務,而是擅長處理廣播,即時通訊,即時反饋的業務。

基于Stream類型的實現

基本上已經有了一個消息中間件的雛形,可以考慮在生產過程中使用,當然真正要在生產中應用,要做的事情還很多,比如消息隊列的管理和監控就需要花大力氣去實現,而專業消息隊列都已經自帶或者存在著很好的第三方方案和插件。不保證消息不丟失。

消息隊列問題

從我們上面對Stream的使用表明,Stream已經具備了一個消息隊列的基本要素,生產者API、消費者API,消息Broker,消息的確認機制等等,所以在使用消息中間件中產生的問題,這里一樣也會遇到。

Stream 消息太多怎么辦?

要是消息積累太多,Stream 的鏈表豈不是很長,內容會不會爆掉?xdel 指令又不會刪除消息,它只是給消息做了個標志位。

Redis 自然考慮到了這一點,所以它提供了一個定長 Stream 功能。在 xadd 的指令提供一個定長長度 maxlen,就可以將老的消息干掉,確保最多不超過指定長度。

消息如果忘記 ACK 會怎樣?

Stream 在每個消費者結構中保存了正在處理中的消息 ID 列表 PEL,如果消費者收到了消息處理完了但是沒有回復 ack,就會導致 PEL 列表不斷增長,如果有很多消費組的話,那么這個 PEL 占用的內存就會放大。所以消息要盡可能的快速消費并確認。

PEL 如何避免消息丟失?

在客戶端消費者讀取 Stream 消息時,Redis 服務器將消息回復給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是 PEL 里已經保存了發出去的消息 ID。待客戶端重新連上之后,可以再次收到 PEL 中的消息 ID 列表。不過此時 xreadgroup 的起始消息 ID 不能為參數>,而必須是任意有效的消息 ID,一般將參數設為 0-0,表示讀取所有的 PEL 消息以及自last_delivered_id之后的新消息。

死信問題

如果某個消息,不能被消費者處理(處理失敗),也就是不能被XACK,這是要長時間處于Pending列表中,即使被反復的轉移給各個消費者也是如此。此時該消息的delivery counter(通過XPENDING可以查詢到)就會累加,當累加到某個我們預設的臨界值時,我們就認為是壞消息(也叫死信,DeadLetter,無法投遞的消息),刪除即可。

刪除一個消息,使用XDEL語法,注意,這個命令并沒有刪除Pending中的消息,因此查看Pending,消息還會在,可以在執行執行XDEL之后,XACK這個消息標識其處理完畢。

Stream 的高可用

Stream 的高可用是建立主從復制基礎上的,它和其它數據結構的復制機制沒有區別,也就是說在 Sentinel 和 Cluster 集群環境下 Stream 是可以支持高可用的。不過鑒于 Redis 的指令復制是異步的,在 failover 發生時,Redis 可能會丟失極小部分數據,這點 Redis 的其它數據結構也是一樣的。

分區 Partition

Redis 的服務器沒有原生支持分區能力,如果想要使用分區,那就需要分配多個 Stream,然后在客戶端使用一定的策略來生產消息到不同的 Stream。

Stream小結

Stream 的消費模型借鑒了 Kafka 的消費分組的概念,它彌補了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的話,得在客戶端做,提供不同的 Stream 名稱,對消息進行 hash 取模來選擇往哪個 Stream 里塞。

總的來說,如果是中小項目和企業,在工作中已經使用了Redis,在業務量不是很大,而又需要消息中間件功能的情況下,可以考慮使用Redis的Stream功能。但是如果并發量很高,資源足夠支持下,還是以專業的消息中間件,比如RocketMQ、Kafka等來支持業務更好。

HyperLogLog

HyperLogLog并不是一種新的數據結構(實際類型為字符串類型),而是一種基數算法,通過HyperLogLog可以利用極小的內存空間完成獨立總數的統計,數據集可以是IP、Email、ID等。

如果你負責開發維護一個大型的網站,有一天產品經理要網站每個網頁每天的 UV 數據,然后讓你來開發這個統計模塊,你會如何實現?

如果統計 PV 那非常好辦,給每個網頁一個獨立的 Redis 計數器就可以了,這個計數器的 key 后綴加上當天的日期。這樣來一個請求,incrby 一次,最終就可以統計出所有的 PV 數據。

但是 UV 不一樣,它要去重,一個簡單的方案,那就是為每一個頁面一個獨立的 set 集合來存儲所有當天訪問過此頁面的用戶 ID。當一個請求過來時,我們使用 sadd 將用戶 ID 塞進去就可以了。通過 scard 可以取出這個集合的大小,這個數字就是這個頁面的 UV 數據。

但是,如果你的頁面訪問量非常大,比如一個爆款頁面幾千萬的 UV,你需要一個很大的 set 集合來統計,這就非常浪費空間。如果這樣的頁面很多,那所需要的存儲空間是驚人的。為這樣一個去重功能就耗費這樣多的存儲空間,值得么?其實需要的數據又不需要太精確,1050w 和 1060w 這兩個數字對于老板們來說并沒有多大區別,So,有沒有更好的解決方案呢?

Redis 提供了 HyperLogLog 數據結構就是用來解決這種統計問題的。HyperLogLog 提供不精確的去重計數方案,雖然不精確但是也不是非常不精確,Redis官方給出標準誤差是 0.81%,這樣的精確度已經可以滿足上面的 UV 統計需求了。

HyperLogLog提供了3個命令: pfadd、pfcount、pfmerge。

例如08-15的訪問用戶是u1、u2、u3、u4,08-16的訪問用戶是u-4、u-5、u-6、u-7 pfadd 2013-01-02:user:id u1 u2 u3 u4 u5 u6 //添加元素 PFCOUNT 2013-01-02:user:id //返回元素個數 6 PFADD 2013-01-03:user:id u1 u2 u3 u90 u91 PFMERGE 2013-01-02:user:id 2013-01-03:user:id //合并元素到2013-01-02:user:id PFCOUNT 2013-01-02:user:id //求并集,返回8

以使用集合類型和 HperLogLog統計百萬級用戶訪問次數的占用空間對比:

數據類型

1天

1月

1年

集合類型

80M

2.4G

28G

HyperLogLog

15k

450k

5M

可以看到,HyperLogLog內存占用量小得驚人,但是用如此小空間來估算如此巨大的數據,必然不是100%的正確,其中一定存在誤差率。前面說過,Redis官方給出的數字是0.81%的失誤率。

Redis事務

簡單地說,事務表示一組動作,要么全部執行,要么全部不執行。例如在社交網站上用戶A關注了用戶B,那么需要在用戶A的關注表中加入用戶B,并且在用戶B的粉絲表中添加用戶A,這兩個行為要么全部執行,要么全部不執行,否則會出現數據不一致的情況。

multi //開啟事務 exec //事務結束,開始執行 discard //停止執行,代替exec,它們之間的命令是原子順序執行的

可以看到sadd命令此時的返回結果是QUEUED,代表命令并沒有真正執行,而是暫時保存在Redis中的一個緩存隊列(所以discard也只是丟棄這個緩存隊列中的未執行命令,并不會回滾已經操作過的數據,這一點要和關系型數據庫的Rollback操作區分開)。如果此時另一個客戶端執行sismember u:a:follow ub返回結果應該為0。


 

事務中出現錯誤

  1. 命令錯誤,屬于語法錯誤,會造成整個事務無法執行
  2. 運行時錯誤,例如用戶B在添加粉絲列表時,誤把sadd命令(針對集合)寫成了zadd命令(針對有序集合),這種就是運行時命令,因為語法是正確的,那第一條執行成功,第二條執行失敗,

 

可以看到Redis并不支持回滾功能,開發人員需要自己修復這類問題。

watch

有些應用場景需要在事務之前,確保事務中的key沒有被其他客戶端修改過,才執行事務,否則不執行(類似樂觀鎖)。Redis 提供了watch命令來解決這類問題。


 

可以看到“客戶端-1”在執行multi之前執行了watch命令,“客戶端-2”在“客戶端-1”執行exec之前修改了key值,造成客戶端-1事務沒有執行(exec結果為nil)。

Pipeline和事務的區別

1、pipeline是客戶端的行為,對于服務器來說是透明的,可以認為服務器無法區分客戶端發送來的查詢命令是以普通命令的形式還是以pipeline的形式發送到服務器的;

2、而事務則是實現在服務器端的行為,用戶執行MULTI命令時,服務器會將對應這個用戶的客戶端對象設置為一個特殊的狀態,在這個狀態下后續用戶執行的查詢命令不會被真的執行,而是被服務器緩存起來,直到用戶執行EXEC命令為止,服務器會將這個用戶對應的客戶端對象中緩存的命令按照提交的順序依次執行。

3、應用pipeline可以提服務器的吞吐能力,并提高Redis處理查詢請求的能力。

存在問題,當通過pipeline提交的查詢命令數據較少時(可以被內核緩沖區所容納),Redis可以保證這些命令執行的原子性。然而一旦數據量過大,超過了內核緩沖區的接收大小,那么命令的執行將會被打斷,原子性也就無法得到保證。

因此pipeline只是一種提升服務器吞吐能力的機制,如果想要命令以事務的方式原子性的被執行,還是需要事務機制,或者使用更高級的腳本功能以及模塊功能。

4、可以將事務和pipeline結合起來使用,減少事務的命令在網絡上的傳輸時間,將多次網絡IO縮減為一次網絡IO。

Redis提供了簡單的事務,之所以說它簡單,主要是因為它不支持事務中的回滾特性,同時無法實現命令之間的邏輯關系計算,當然也體現了Redis 的“keep it simple”的特性。

 

作者:咖啡沖不沖 鏈接:https://juejin.cn/post/7184007074945171513 來源:稀土掘金

分享到:
標簽:Redis
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定