redis的 list 數(shù)據(jù)結(jié)構(gòu)常用來(lái)作為 異步消息隊(duì)列 使用,使用 rpush/lpush 操作 入隊(duì) ,使用 lpop/rpop 來(lái)操作 出隊(duì)
> rpush my-queue Apple banana pear (integer) 3 > llen my-queue (integer) 3 > lpop my-queue "apple" > llen my-queue (integer) 2 > lpop my-queue "banana" > llen my-queue (integer) 1 > lpop my-queue "pear" > llen my-queue (integer) 0 > lpop my-queue (nil)
空隊(duì)列
- 如果隊(duì)列為空,客戶(hù)端會(huì)陷入 pop的死循環(huán) , 空輪詢(xún) 不僅拉高了 客戶(hù)端的CPU , Redis的QPS 也會(huì)被拉高
- 如果空輪詢(xún)的客戶(hù)端有幾十個(gè), Redis的慢查詢(xún) 也會(huì)顯著增加,可以嘗試讓客戶(hù)端線(xiàn)程 sleep 1s
- 但睡眠會(huì)導(dǎo)致消息的 延遲增大 ,可以使用 blpop/brpop (blocking, 阻塞讀 )
- 阻塞讀在隊(duì)列沒(méi)有數(shù)據(jù)時(shí),會(huì)立即進(jìn)入 休眠 狀態(tài),一旦有數(shù)據(jù)到來(lái),會(huì)立即被 喚醒 , 消息延遲幾乎為0
空閑連接
- 如果線(xiàn)程一直阻塞在那里,Redis的客戶(hù)端連接就成了 閑置連接
- 閑置過(guò)久, 服務(wù)器 一般會(huì) 主動(dòng)斷開(kāi) 連接, 減少閑置的資源占用 ,此時(shí) blpop/brpop 會(huì) 拋出異常
鎖沖突處理
- 分布式鎖 加鎖失敗 的處理策略
- 直接拋出異常 ,通知用戶(hù)稍后重試
- sleep 后再重試
- 將請(qǐng)求轉(zhuǎn)移到 延時(shí)隊(duì)列 ,過(guò)一會(huì)重試
- 拋出異常
- 這種方式比較適合由 用戶(hù)直接發(fā)起 的請(qǐng)求
- sleep
- sleep會(huì) 阻塞 當(dāng)前的消息處理線(xiàn)程,從而導(dǎo)致隊(duì)列的后續(xù)消息處理出現(xiàn) 延遲
- 如果 碰撞比較頻繁 ,sleep方案不合適
- 延時(shí)隊(duì)列
- 比較適合異步消息處理的場(chǎng)景,通過(guò)將當(dāng)前沖突的請(qǐng)求轉(zhuǎn)移到另一個(gè)隊(duì)列 延后處理 來(lái) 避免沖突
延時(shí)隊(duì)列
- 可以通過(guò)Redis的 zset 來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列
- 將消息序列化成一個(gè)字符串作為zet的 value ,將該消息的 到期處理時(shí)間 作為 score
- 然后 多線(xiàn)程輪詢(xún) zset獲取 到期的任務(wù) 進(jìn)行處理
- 多線(xiàn)程是為了保障 可用性 ,但同時(shí)要考慮 并發(fā)安全 ,確保 任務(wù)不能被多次執(zhí)行
public class RedisDelayingQueue<T> { @Data @AllArgsConstructor @NoArgsConstructor private static class TaskItem<T> { private String id; private T msg; } private Type taskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task)); } public void loop() { // 可以進(jìn)一步優(yōu)化,通過(guò)Lua腳本將zrangeByScore和zrem統(tǒng)一挪到Redis服務(wù)端進(jìn)行原子化操作,減少搶奪失敗出現(xiàn)的資源浪費(fèi) while (!Thread.interrupted()) { // 只取一條 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // zrem是多線(xiàn)程多進(jìn)程爭(zhēng)奪任務(wù)的關(guān)鍵 TaskItem<T> task = JSON.parseobject(s, taskType); this.handleMsg(task.msg); } } } private void handleMsg(T msg) { try { System.out.println(msg); } catch (Throwable ignored) { // 一定要捕獲異常,避免因?yàn)閭€(gè)別任務(wù)處理問(wèn)題導(dǎo)致循環(huán)異常退出 } } public static void main(String[] args) { final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo"); Thread producer = new Thread() { @Override public void run() { for (int i = 0; i < 10; i++) { queue.delay("zhongmingmao" + i); } } }; Thread consumer = new Thread() { @Override public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException ignored) { } } }