項(xiàng)目產(chǎn)品中,大家都會(huì)有"定時(shí)任務(wù)"和"定時(shí)超時(shí)"的需求,初始階段,我們基本都是用少數(shù)的一些timer,即使是任務(wù)量越來(lái)越大的時(shí)候,我們就難免維護(hù)著大量的timer,或者進(jìn)行了大量低效的掃描。
定時(shí)任務(wù)使用場(chǎng)景:當(dāng)訂單一直處于未支付狀態(tài)時(shí),如何及時(shí)的關(guān)閉訂單(已經(jīng)使用)
如何定期檢查處于退款狀態(tài)的訂單是否已經(jīng)退款成功(后期重構(gòu)使用)
設(shè)計(jì)方案:
- 整個(gè)redis當(dāng)做消息池,以KV形式存儲(chǔ)消息
- 使用ZSET做優(yōu)先隊(duì)列,按照Score維持優(yōu)先級(jí)
- 使用LIST結(jié)構(gòu),以先進(jìn)先出的方式消費(fèi)
- ZSET和LIST存儲(chǔ)消息地址(對(duì)應(yīng)消息池的每個(gè)KEY)
- 使用定時(shí)器維護(hù)路由
- 根據(jù)TTL規(guī)則實(shí)現(xiàn)消息延遲
咱們公司現(xiàn)階段就是使用的這套方法:
1.新增一個(gè)job,會(huì)job_pool中插入一條數(shù)據(jù),記錄了業(yè)務(wù)方消費(fèi)方。也會(huì)在bucket插入一條記錄,記錄執(zhí)行的時(shí)間戳
2.搬運(yùn)線程會(huì)去bucket中查找哪些執(zhí)行時(shí)間戳的RunTimeMillis比現(xiàn)在的時(shí)間小,將這些記錄全部刪除;同時(shí)會(huì)解析出每個(gè)任務(wù)的Topic是什么,然后將這些任務(wù)PUSH到TOPIC對(duì)應(yīng)的列表queue中
3每個(gè)topic的list都會(huì)有一個(gè)監(jiān)聽(tīng)線程去批量獲取list中的待消費(fèi)數(shù)據(jù),獲取到的數(shù)據(jù)全部扔給這個(gè)topic的消費(fèi)線程池
4.消費(fèi)線程池執(zhí)行會(huì)去job_pool查找數(shù)據(jù)結(jié)構(gòu),返回給回調(diào)結(jié)構(gòu),執(zhí)行回調(diào)方法。
圖片
待優(yōu)化的內(nèi)容:
- 目前只有一個(gè)Queue隊(duì)列存放消息,當(dāng)需要消費(fèi)的消息大量堆積后,會(huì)影響消息通知的時(shí)效。改進(jìn)的辦法是,開(kāi)啟多個(gè)Queue,進(jìn)行消息路由,再開(kāi)啟多個(gè)消費(fèi)線程進(jìn)行消費(fèi),提供吞吐量
- 消息沒(méi)有進(jìn)行持久化,存在風(fēng)險(xiǎn),后續(xù)會(huì)將消息持久化到MongoDB中
一般來(lái)說(shuō)還有什么其他方法實(shí)現(xiàn)這類需求呢?
“輪詢掃描法”
1.用一個(gè)Map<uid, last_packet_time>來(lái)記錄每一個(gè)uid最近一次請(qǐng)求時(shí)間last_packet_time
2.當(dāng)某個(gè)用戶uid有請(qǐng)求包來(lái)到,實(shí)時(shí)更新這個(gè)Map
3.啟動(dòng)一個(gè)timer,當(dāng)Map中不為空時(shí),輪詢掃描這個(gè)Map,看每個(gè)uid的last_packet_time是否超過(guò)30s,如果超過(guò)則進(jìn)行超時(shí)處理
“多timer觸發(fā)法”
1.用一個(gè)Map<uid, last_packet_time>來(lái)記錄每一個(gè)uid最近一次請(qǐng)求時(shí)間last_packet_time
2.當(dāng)某個(gè)用戶uid有請(qǐng)求包來(lái)到,實(shí)時(shí)更新這個(gè)Map,并同時(shí)對(duì)這個(gè)uid請(qǐng)求包啟動(dòng)一個(gè)timer,30s之后觸發(fā)
3.每個(gè)uid請(qǐng)求包對(duì)應(yīng)的timer觸發(fā)后,看Map中,查看這個(gè)uid的last_packet_time是否超過(guò)30s,如果超過(guò)則進(jìn)行超時(shí)處理
方案一:只啟動(dòng)一個(gè)timer,但需要輪詢,效率較低
方案二:不需要輪詢,但每個(gè)請(qǐng)求包要啟動(dòng)一個(gè)timer,比較耗資源
ZSet(有序集合)數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn)
- 創(chuàng)建ZSet:首先,你需要?jiǎng)?chuàng)建一個(gè)ZSet數(shù)據(jù)結(jié)構(gòu),其中每個(gè)訂單將作為一個(gè)成員,其分?jǐn)?shù)將表示訂單的創(chuàng)建時(shí)間戳。你可以使用Redis等支持ZSet的數(shù)據(jù)庫(kù)來(lái)實(shí)現(xiàn)。
- 添加訂單:每當(dāng)用戶創(chuàng)建新訂單時(shí),將訂單添加到ZSet中,其中成員是訂單ID,分?jǐn)?shù)是訂單的創(chuàng)建時(shí)間戳。
- 定時(shí)檢查訂單:定期(例如,每分鐘)執(zhí)行一個(gè)程序或定時(shí)任務(wù)來(lái)檢查ZSet中的訂單。你可以使用程序來(lái)查詢ZSet,找到創(chuàng)建時(shí)間超過(guò)一定時(shí)間閾值的訂單,表示它們長(zhǎng)時(shí)間未支付。
- 取消訂單:對(duì)于那些長(zhǎng)時(shí)間未支付的訂單,可以將其從ZSet中刪除,并執(zhí)行取消訂單的操作(例如,將訂單狀態(tài)設(shè)置為"已取消")。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import JAVA.util.Set;
public class OrderCancellationSystem {
public static void mAIn(String[] args) {
Jedis jedis = new Jedis("localhost"); // 連接到本地Redis服務(wù)器
// 模擬添加訂單
addOrder(jedis, "Order1");
addOrder(jedis, "Order2");
// 定時(shí)任務(wù),每分鐘檢查訂單并自動(dòng)取消
while (true) {
cancelLongUnpaidOrders(jedis);
try {
Thread.sleep(60000); // 等待一分鐘再次檢查
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void addOrder(Jedis jedis, String orderId) {
long currentTime = System.currentTimeMillis();
jedis.zadd("orders", currentTime, orderId);
}
public static void cancelOrder(String orderId) {
// 執(zhí)行取消訂單操作,例如更新訂單狀態(tài)
System.out.println("Cancelling order: " + orderId);
}
public static void cancelLongUnpaidOrders(Jedis jedis) {
long expirationTime = System.currentTimeMillis() - 3600 * 1000; // 60分鐘前的時(shí)間戳
Set<Tuple> longUnpaidOrders = jedis.zrangeByScoreWithScores("orders", "-inf", String.valueOf(expirationTime));
for (Tuple order : longUnpaidOrders) {
String orderId = order.getElement();
cancelOrder(orderId);
jedis.zrem("orders", orderId); // 從ZSet中刪除已取消的訂單
}
}
}