背景
最近項目有個需求需要動態(tài)更新規(guī)則,當時腦中想到的第一個方案是利用zk的監(jiān)聽機制,管理人員更新完規(guī)則將狀態(tài)寫入zk,集群中的機器監(jiān)聽zk的狀態(tài),當有狀態(tài)變更后,集群中的機器開始拉取最新的配置。但由于公司技術選型,沒有專門搭建zk集群,因此也不可能為這一個小需求去搭建zk集群。圖為使用zk監(jiān)聽狀態(tài)變化的流程。
最后只好退而求其次,想到了使用redis的隊列來做規(guī)則的更新
消息隊列
首先做簡單的引入。
- 隊列(來自百度百科):是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進行刪除操作,而在表的后端(rear)進行插入操作,和棧一樣,隊列是一種操作受限制的線性表。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。
- 消息隊列(來自百度百科):是在消息的傳輸過程中保存消息的容器。
從隊列和消息隊列的定義看來,看不出什么相似之處。但我理解它們的作用是相似的,只是使用環(huán)境不同。隊列和消息隊列 本質(zhì)上都可以用于解決“生產(chǎn)者”和“消費者”問題,在二者這間建立橋梁,it中專業(yè)術語是對“生產(chǎn)者”和“消費者”進行解耦。可以動態(tài)的通過調(diào)整“生產(chǎn)者”和“消費者”線程數(shù)或服務器實例數(shù),在正常情況使消費和生產(chǎn)到達一個平衡;在高峰情況下(生產(chǎn)者大于消費者)可以保護消費者不被拖垮的同時,還可以對把積壓的數(shù)據(jù)保存下來,消費者可以延遲消費這些數(shù)據(jù)進行處理。
隊列 一般指的是單個服務實例內(nèi)部使用,比如,在JAVA中的一個jvm實例內(nèi)部可以使用Queue的子類(Deque:雙端隊列,是Queue的子接口),比如:單線程情況下使用LinkedList(無界)、PriorityQueue(優(yōu)先隊列);多線程情況下可以阻塞隊列ArrayBlockingQueue(有界)、LinkedBlockingQueue(無界)、DelayQueue(延遲隊列 無界)、PriorityBlockingQueue(優(yōu)先 無界)、SynchronousQueue(沒有容量的隊列)。可以看到java的api已經(jīng)很強大了,可以根據(jù)自己的業(yè)務需求選擇使用。使用方法:生產(chǎn)者從一端放入消息,消費者從另一端取出消息進行處理,消息放到隊列里(感覺是不是有點像“消息隊列”的定義)。
MQ主要是用來:
- 解耦應用、
- 異步化消息
- 流量削峰填谷
目前使用的較多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
另外上面提到的“有界”和“無界”,指的是隊列的容量大小。有界 指的是創(chuàng)建隊列時必須指定隊列的容量;無界 創(chuàng)建隊列時無需指定隊列的容量,容量大小取決于jvm實例分配的內(nèi)存空間大小。在海量業(yè)務場景里,我們期望隊列的容量是無限的,但單個jvm實例 即便是使用“無界”隊列 由于單個實例內(nèi)存是有限的,最終無法容納下海量的消息數(shù)據(jù)。聰明的程序員就想 能不能使用一個第三方的隊列來存儲這些數(shù)據(jù)呢?當然是可以的,這就產(chǎn)生了“消息隊列”。
消息隊列 一般是采用一個獨立的集群專門用于消息存儲,可以存儲在內(nèi)存里 也可以直接存儲在磁盤中。比如常見的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它們有不同的特性,以及采用了各種不同的實現(xiàn),適用于各種場景的消息任務分發(fā)。但他們本質(zhì)作用跟上面講的單實例環(huán)境中java“隊列”沒什么兩樣:在消息的傳輸過程中保存消息的容器。只是這里轉(zhuǎn)換到“分布式”環(huán)境中而已。
可以看到這里這里提到的“傳統(tǒng)”消息隊列,都是一個很重型的集群。如果這個分布式環(huán)境中的消息數(shù)量有限,我們可以不必引入這種重型的mq框架。比如:本次分享的主題 如何使用redis實現(xiàn)“消息隊列”。
redis 實現(xiàn)消息隊列
redis有一個數(shù)據(jù)類型叫l(wèi)ist(列表),它的每個子元素都是 string 類型的雙向鏈表。我們可以通過 push,pop 操作從鏈表的頭部或者尾部添加刪除元素。這使得 list 既可以用作棧,也可以用作隊列。
假如,我們有一個隊列系統(tǒng),把一個個任務放到隊列中,另一個進程就把隊列中的任務取出來執(zhí)行。
放到隊列我們使用LPUSH,也就是往雙向鏈表的尾部填充一個元素,這一端也叫生產(chǎn)者,是產(chǎn)生內(nèi)容的一端。
另一個進程使用RPOP往頭部取出元素來執(zhí)行,這一端也叫消費者。
如果僅僅是這種方式來實現(xiàn)隊列,它就是需要進程不斷地循環(huán)隊列,判斷隊列是不是有新元素,有的話就取出來執(zhí)行,沒有的話,就繼續(xù)循環(huán),但是這個總有一個時間間隔,你總得規(guī)定每隔一段時間去循環(huán),雖然這個時間很小,但總有延遲,這種方式叫作輪循。有沒有一種方式就是讓不斷執(zhí)行一個redis命令,而redis中的列隊有值就會通過命令通知程序呢?有的,那就是阻塞操作的RPOP,它叫作BRPOP。
我們來演示一下它是如何實現(xiàn)的。
$ redis-cli 127.0.0.1:6379> BRPOP list1 0
先執(zhí)行BRPOP,假如隊列l(wèi)ist1沒有值,它會返回nil,并且阻塞在那,在等另一個程序或進程往list1中填值。
我們開啟另一個redis端終。
$ redis-cli 127.0.0.1:6379> LPUSH list1 a (integer) 1
我們再來看之前的結果。
127.0.0.1:6379> BRPOP list1 0 1) "list1" 2) "a" (16.99s)
這樣就能把列表的值給取到了。
優(yōu)點
- 能夠?qū)崿F(xiàn)持久化
- 采用 Master-Slave 數(shù)據(jù)復制模式。隊列操作都是寫操作,Master任務繁重,能讓Slave分擔的持久化工作,就不要Master做。RDB和AOF兩種方法都用上,多重保險。
- 支持集群
- 接口使用簡單
不足
- Redis上消息只會被一個消費者消費,不會有多個訂閱者消費同一個消息,簡單一對一
- 生產(chǎn)者或者消費者崩潰后的處理機制,需要自己實現(xiàn)
- 生產(chǎn)者寫入太快,消費者消費太慢,導致Redis的內(nèi)存問題,處理機制需要自己實現(xiàn)
通過pub/sub來實現(xiàn)
實現(xiàn)機制
訂閱,取消訂閱和發(fā)布實現(xiàn)了發(fā)布/訂閱消息范式,發(fā)布者不是計劃發(fā)送消息給特定的訂閱者。而是發(fā)布的消息分到不同的頻道,不需要知道什么樣的訂閱者訂閱。訂閱者對一個或多個頻道感興趣,只需接收感興趣的消息,不需要知道什么樣的發(fā)布者發(fā)布的。
這是一種基于非持久化的消息機制,消息發(fā)布者和訂閱者必須同時在線,否則一旦消息訂閱者由于各種異常情況而被迫斷開連接,在其重新連接后,其離線期間的消息是無法被重新通知的(即發(fā)即棄)。
Redis中的消息可以提供兩種不同的功能。一類是基于Channel的消息,這一類消息和Redis中存儲的Keys沒有太多關聯(lián),也就是說即使不在Redis中存儲任何Keys信息,這類消息也可以獨立使用。另一類消息可以對(也可以不對)Redis中存儲的Keys信息的變化事件進行通知,可以用來向訂閱者通知Redis中符合訂閱條件的Keys的各種事件。
通過springboot 構建redis消息隊列
首先springboot配置文件配置如下:
spring.redis.host=localhost spring.redis.port=6379
消息生產(chǎn)者,注入redisTemplate,用convertAndSend發(fā)送消息
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; @Service public class PublishService { @Autowired private StringRedisTemplate stringRedisTemplate; public void sendMsg(String channel, String msg) { stringRedisTemplate.convertAndSend(channel, msg); } }
消費者:創(chuàng)建一個接收消息的類,繼承MessageListener,也可以不繼承
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @Slf4j @Service public class RedisReceiver { public void receiveMessage(String message) { log.info("receive message is {}",message); } }
消息訂閱者配置類:
import com.wuzy.queue.RedisReceiver; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.stereotype.Component; /** * redis 監(jiān)聽配置 */ @Configuration public class RedisSubListenerConfig { /** * 初始化監(jiān)聽器 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("channel_1")); // new PatternTopic("這里是監(jiān)聽的通道的名字") 通道要和發(fā)布者發(fā)布消息的通道一致 return container; } /** * 綁定消息監(jiān)聽者和接收監(jiān)聽的方法 * * @param redisReceiver * @return */ @Bean MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) { // redisReceiver 消息接收者 // receiveMessage 消息接收后的方法 MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(); messageListenerAdapter.setDefaultListenerMethod("receiveMessage"); messageListenerAdapter.setDelegate(redisReceiver); return messageListenerAdapter; } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
優(yōu)點
- 一個生產(chǎn)者能夠?qū)鄠€消費者
- 支持集群
- 接口使用簡單
不足
- Redis提供的訂閱/發(fā)布功能并不完美,更不能和ActiveMQ/RabbitMQ提供的訂閱/發(fā)布功能相提并論。
- 首先這些消息并沒有持久化機制,屬于即發(fā)即棄模式。也就是說它們不能像ActiveMQ中的消息那樣保證持久化消息訂閱者不會錯過任何消息,無論這些消息訂閱者是否隨時在線。
- 由于本來就是即發(fā)即棄的消息模式,所以Redis也不需要專門制定消息的備份和恢復機制。
- 也是由于即發(fā)即棄的消息模式,所以Redis也沒有必要專門對使用訂閱/發(fā)布功能的客戶端連接進行識別,用來明確該客戶端連接的ID是否在之前已經(jīng)連接過Redis服務了。ActiveMQ中保持持續(xù)通知的功能的前提,就是能夠識別客戶端連接ID的歷史連接情況,以便確定哪些訂閱消息這個客戶端還沒有處理。
- Redis也沒有為發(fā)布者和訂閱者準備保證消息性能的任何方案,例如在大量消息同時到達Redis服務是,如果消息訂閱者來不及完成消費,就可能導致消息堆積。而ActiveMQ中有專門針對這種情況的慢消息機制。