redis的 list 數據結構常用來作為 異步消息隊列 使用,使用 rpush/lpush 操作 入隊 ,使用 lpop/rpop 來操作 出隊
> rpush my-queue Apple banana pear (integer) 3 > llen my-queue (integer) 3 > lpop my-queue "apple" > llen my-queue (integer) 2 > lpop my-queue "banana" > llen my-queue (integer) 1 > lpop my-queue "pear" > llen my-queue (integer) 0 > lpop my-queue (nil)
空隊列
- 如果隊列為空,客戶端會陷入 pop的死循環 , 空輪詢 不僅拉高了 客戶端的CPU , Redis的QPS 也會被拉高
- 如果空輪詢的客戶端有幾十個, Redis的慢查詢 也會顯著增加,可以嘗試讓客戶端線程 sleep 1s
- 但睡眠會導致消息的 延遲增大 ,可以使用 blpop/brpop (blocking, 阻塞讀 )
- 阻塞讀在隊列沒有數據時,會立即進入 休眠 狀態,一旦有數據到來,會立即被 喚醒, 消息延遲幾乎為0
空閑連接
- 如果線程一直阻塞在那里,Redis的客戶端連接就成了 閑置連接
- 閑置過久, 服務器 一般會 主動斷開 連接, 減少閑置的資源占用 ,此時 blpop/brpop 會 拋出異常
鎖沖突處理
- 分布式鎖 加鎖失敗 的處理策略
- 直接拋出異常 ,通知用戶稍后重試
- sleep 后再重試
- 將請求轉移到 延時隊列 ,過一會重試
- 拋出異常
- 這種方式比較適合由 用戶直接發起 的請求
- sleep
- sleep會 阻塞 當前的消息處理線程,從而導致隊列的后續消息處理出現 延遲
- 如果 碰撞比較頻繁 ,sleep方案不合適
- 延時隊列
- 比較適合異步消息處理的場景,通過將當前沖突的請求轉移到另一個隊列 延后處理來 避免沖突
延時隊列
- 可以通過Redis的 zset 來實現延時隊列
- 將消息序列化成一個字符串作為zet的 value ,將該消息的 到期處理時間 作為 score
- 然后 多線程輪詢 zset獲取 到期的任務 進行處理
- 多線程是為了保障 可用性 ,但同時要考慮 并發安全 ,確保 任務不能被多次執行
public class RedisDelayingQueue<T> { @Data @AllArgsConstructor @NoArgsConstructor private static class TaskItem<T> { private String id; private T msg; } private Type taskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task)); } public void loop() { // 可以進一步優化,通過Lua腳本將zrangeByScore和zrem統一挪到Redis服務端進行原子化操作,減少搶奪失敗出現的資源浪費 while (!Thread.interrupted()) { // 只取一條 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // zrem是多線程多進程爭奪任務的關鍵 TaskItem<T> task = JSON.parseobject(s, taskType); this.handleMsg(task.msg); } } } private void handleMsg(T msg) { try { System.out.println(msg); } catch (Throwable ignored) { // 一定要捕獲異常,避免因為個別任務處理問題導致循環異常退出 } } public static void main(String[] args) { final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo"); Thread producer = new Thread() { @Override public void run() { for (int i = 0; i < 10; i++) { queue.delay("zhongmingmao" + i); } } }; Thread consumer = new Thread() { @Override public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException ignored) { } } }