日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務,提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

背景

最近項目有個需求需要動態(tài)更新規(guī)則,當時腦中想到的第一個方案是利用zk的監(jiān)聽機制,管理人員更新完規(guī)則將狀態(tài)寫入zk,集群中的機器監(jiān)聽zk的狀態(tài),當有狀態(tài)變更后,集群中的機器開始拉取最新的配置。但由于公司技術選型,沒有專門搭建zk集群,因此也不可能為這一個小需求去搭建zk集群。圖為使用zk監(jiān)聽狀態(tài)變化的流程。

 

使用redis作為消息隊列的用法

 

 

最后只好退而求其次,想到了使用redis的隊列來做規(guī)則的更新

消息隊列

首先做簡單的引入。

  1. 隊列(來自百度百科):是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進行刪除操作,而在表的后端(rear)進行插入操作,和棧一樣,隊列是一種操作受限制的線性表。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。
  2. 消息隊列(來自百度百科):是在消息的傳輸過程中保存消息的容器。

從隊列和消息隊列的定義看來,看不出什么相似之處。但我理解它們的作用是相似的,只是使用環(huán)境不同。隊列和消息隊列 本質(zhì)上都可以用于解決“生產(chǎn)者”和“消費者”問題,在二者這間建立橋梁,it中專業(yè)術語是對“生產(chǎn)者”和“消費者”進行解耦。可以動態(tài)的通過調(diào)整“生產(chǎn)者”和“消費者”線程數(shù)或服務器實例數(shù),在正常情況使消費和生產(chǎn)到達一個平衡;在高峰情況下(生產(chǎn)者大于消費者)可以保護消費者不被拖垮的同時,還可以對把積壓的數(shù)據(jù)保存下來,消費者可以延遲消費這些數(shù)據(jù)進行處理。

隊列 一般指的是單個服務實例內(nèi)部使用,比如,在JAVA中的一個jvm實例內(nèi)部可以使用Queue的子類(Deque:雙端隊列,是Queue的子接口),比如:單線程情況下使用LinkedList(無界)、PriorityQueue(優(yōu)先隊列);多線程情況下可以阻塞隊列ArrayBlockingQueue(有界)、LinkedBlockingQueue(無界)、DelayQueue(延遲隊列 無界)、PriorityBlockingQueue(優(yōu)先 無界)、SynchronousQueue(沒有容量的隊列)。可以看到java的api已經(jīng)很強大了,可以根據(jù)自己的業(yè)務需求選擇使用。使用方法:生產(chǎn)者從一端放入消息,消費者從另一端取出消息進行處理,消息放到隊列里(感覺是不是有點像“消息隊列”的定義)。

MQ主要是用來:

  1. 解耦應用、
  2. 異步化消息
  3. 流量削峰填谷

目前使用的較多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

使用redis作為消息隊列的用法

 

另外上面提到的“有界”和“無界”,指的是隊列的容量大小。有界 指的是創(chuàng)建隊列時必須指定隊列的容量;無界 創(chuàng)建隊列時無需指定隊列的容量,容量大小取決于jvm實例分配的內(nèi)存空間大小。在海量業(yè)務場景里,我們期望隊列的容量是無限的,但單個jvm實例 即便是使用“無界”隊列 由于單個實例內(nèi)存是有限的,最終無法容納下海量的消息數(shù)據(jù)。聰明的程序員就想 能不能使用一個第三方的隊列來存儲這些數(shù)據(jù)呢?當然是可以的,這就產(chǎn)生了“消息隊列”。

消息隊列 一般是采用一個獨立的集群專門用于消息存儲,可以存儲在內(nèi)存里 也可以直接存儲在磁盤中。比如常見的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它們有不同的特性,以及采用了各種不同的實現(xiàn),適用于各種場景的消息任務分發(fā)。但他們本質(zhì)作用跟上面講的單實例環(huán)境中java“隊列”沒什么兩樣:在消息的傳輸過程中保存消息的容器。只是這里轉(zhuǎn)換到“分布式”環(huán)境中而已。

使用redis作為消息隊列的用法

 

可以看到這里這里提到的“傳統(tǒng)”消息隊列,都是一個很重型的集群。如果這個分布式環(huán)境中的消息數(shù)量有限,我們可以不必引入這種重型的mq框架。比如:本次分享的主題 如何使用redis實現(xiàn)“消息隊列”。

redis 實現(xiàn)消息隊列

redis有一個數(shù)據(jù)類型叫l(wèi)ist(列表),它的每個子元素都是 string 類型的雙向鏈表。我們可以通過 push,pop 操作從鏈表的頭部或者尾部添加刪除元素。這使得 list 既可以用作棧,也可以用作隊列。

假如,我們有一個隊列系統(tǒng),把一個個任務放到隊列中,另一個進程就把隊列中的任務取出來執(zhí)行。

放到隊列我們使用LPUSH,也就是往雙向鏈表的尾部填充一個元素,這一端也叫生產(chǎn)者,是產(chǎn)生內(nèi)容的一端。

另一個進程使用RPOP往頭部取出元素來執(zhí)行,這一端也叫消費者。

如果僅僅是這種方式來實現(xiàn)隊列,它就是需要進程不斷地循環(huán)隊列,判斷隊列是不是有新元素,有的話就取出來執(zhí)行,沒有的話,就繼續(xù)循環(huán),但是這個總有一個時間間隔,你總得規(guī)定每隔一段時間去循環(huán),雖然這個時間很小,但總有延遲,這種方式叫作輪循。有沒有一種方式就是讓不斷執(zhí)行一個redis命令,而redis中的列隊有值就會通過命令通知程序呢?有的,那就是阻塞操作的RPOP,它叫作BRPOP。

我們來演示一下它是如何實現(xiàn)的。

$ redis-cli
127.0.0.1:6379> BRPOP list1 0

先執(zhí)行BRPOP,假如隊列l(wèi)ist1沒有值,它會返回nil,并且阻塞在那,在等另一個程序或進程往list1中填值。

我們開啟另一個redis端終。

$ redis-cli
127.0.0.1:6379> LPUSH list1 a
(integer) 1

我們再來看之前的結果。

127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "a"
(16.99s)

這樣就能把列表的值給取到了。

優(yōu)點

  1. 能夠?qū)崿F(xiàn)持久化
  2. 采用 Master-Slave 數(shù)據(jù)復制模式。隊列操作都是寫操作,Master任務繁重,能讓Slave分擔的持久化工作,就不要Master做。RDB和AOF兩種方法都用上,多重保險。
  3. 支持集群
  4. 接口使用簡單

不足

  1. Redis上消息只會被一個消費者消費,不會有多個訂閱者消費同一個消息,簡單一對一
  2. 生產(chǎn)者或者消費者崩潰后的處理機制,需要自己實現(xiàn)
  3. 生產(chǎn)者寫入太快,消費者消費太慢,導致Redis的內(nèi)存問題,處理機制需要自己實現(xiàn)

通過pub/sub來實現(xiàn)

實現(xiàn)機制

訂閱,取消訂閱和發(fā)布實現(xiàn)了發(fā)布/訂閱消息范式,發(fā)布者不是計劃發(fā)送消息給特定的訂閱者。而是發(fā)布的消息分到不同的頻道,不需要知道什么樣的訂閱者訂閱。訂閱者對一個或多個頻道感興趣,只需接收感興趣的消息,不需要知道什么樣的發(fā)布者發(fā)布的。

這是一種基于非持久化的消息機制,消息發(fā)布者和訂閱者必須同時在線,否則一旦消息訂閱者由于各種異常情況而被迫斷開連接,在其重新連接后,其離線期間的消息是無法被重新通知的(即發(fā)即棄)。

Redis中的消息可以提供兩種不同的功能。一類是基于Channel的消息,這一類消息和Redis中存儲的Keys沒有太多關聯(lián),也就是說即使不在Redis中存儲任何Keys信息,這類消息也可以獨立使用。另一類消息可以對(也可以不對)Redis中存儲的Keys信息的變化事件進行通知,可以用來向訂閱者通知Redis中符合訂閱條件的Keys的各種事件。

通過springboot 構建redis消息隊列

首先springboot配置文件配置如下:

spring.redis.host=localhost
spring.redis.port=6379

消息生產(chǎn)者,注入redisTemplate,用convertAndSend發(fā)送消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class PublishService {
 @Autowired
 private StringRedisTemplate stringRedisTemplate;
 public void sendMsg(String channel, String msg) {
 stringRedisTemplate.convertAndSend(channel, msg);
 }
}

消費者:創(chuàng)建一個接收消息的類,繼承MessageListener,也可以不繼承

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class RedisReceiver {
 public void receiveMessage(String message) {
 log.info("receive message is {}",message);
 }
}

消息訂閱者配置類:

import com.wuzy.queue.RedisReceiver;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
 * redis 監(jiān)聽配置
 */
@Configuration
public class RedisSubListenerConfig {
 /**
 * 初始化監(jiān)聽器
 *
 * @param connectionFactory
 * @param listenerAdapter
 * @return
 */
 @Bean
 RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
 MessageListenerAdapter listenerAdapter) {
 RedisMessageListenerContainer container = new RedisMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.addMessageListener(listenerAdapter, new PatternTopic("channel_1")); // new PatternTopic("這里是監(jiān)聽的通道的名字") 通道要和發(fā)布者發(fā)布消息的通道一致
 return container;
 }
 /**
 * 綁定消息監(jiān)聽者和接收監(jiān)聽的方法
 *
 * @param redisReceiver
 * @return
 */
 @Bean
 MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
 // redisReceiver 消息接收者
 // receiveMessage 消息接收后的方法
 MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
 messageListenerAdapter.setDefaultListenerMethod("receiveMessage");
 messageListenerAdapter.setDelegate(redisReceiver);
 return messageListenerAdapter;
 }
 @Bean
 StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
 return new StringRedisTemplate(connectionFactory);
 }
}

優(yōu)點

  1. 一個生產(chǎn)者能夠?qū)鄠€消費者
  2. 支持集群
  3. 接口使用簡單

不足

  1. Redis提供的訂閱/發(fā)布功能并不完美,更不能和ActiveMQ/RabbitMQ提供的訂閱/發(fā)布功能相提并論。
  2. 首先這些消息并沒有持久化機制,屬于即發(fā)即棄模式。也就是說它們不能像ActiveMQ中的消息那樣保證持久化消息訂閱者不會錯過任何消息,無論這些消息訂閱者是否隨時在線。
  3. 由于本來就是即發(fā)即棄的消息模式,所以Redis也不需要專門制定消息的備份和恢復機制。
  4. 也是由于即發(fā)即棄的消息模式,所以Redis也沒有必要專門對使用訂閱/發(fā)布功能的客戶端連接進行識別,用來明確該客戶端連接的ID是否在之前已經(jīng)連接過Redis服務了。ActiveMQ中保持持續(xù)通知的功能的前提,就是能夠識別客戶端連接ID的歷史連接情況,以便確定哪些訂閱消息這個客戶端還沒有處理。
  5. Redis也沒有為發(fā)布者和訂閱者準備保證消息性能的任何方案,例如在大量消息同時到達Redis服務是,如果消息訂閱者來不及完成消費,就可能導致消息堆積。而ActiveMQ中有專門針對這種情況的慢消息機制。

分享到:
標簽:redis
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數(shù)有氧達人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定