問(wèn)題背景
當(dāng)使用消息隊(duì)列時(shí),客戶端重復(fù)消費(fèi)可能會(huì)成為一個(gè)嚴(yán)重的問(wèn)題。 這是因?yàn)橄㈥?duì)列具有持久性和可靠性的特性,確保消息能夠被成功傳遞給消費(fèi)者。然而,這也會(huì)導(dǎo)致客戶端在某些情況下重復(fù)消費(fèi)消息,例如網(wǎng)絡(luò)故障、客戶端崩潰、消息處理失敗等情況。
為了避免這種情況發(fā)生,需要在客戶端實(shí)現(xiàn)一些機(jī)制來(lái)確保消息不會(huì)被重復(fù)消費(fèi),例如記錄消費(fèi)者已經(jīng)處理的消息 ID、使用分布式鎖來(lái)控制消費(fèi)進(jìn)程的唯一性等。這些機(jī)制能夠保證消息被成功處理,同時(shí)也能夠提高系統(tǒng)的可靠性和穩(wěn)定性。
今天的文章我們將探討如何確保消息隊(duì)列中的消息不會(huì)被重復(fù)消費(fèi),下文將以 RocketMQ 為例說(shuō)明。
消息冪等性
消息中間件是分布式系統(tǒng)中常用的組件,它具有廣泛的應(yīng)用價(jià)值,例如實(shí)現(xiàn)異步化、解耦、削峰等功能。通常情況下,我們認(rèn)為消息中間件是一個(gè)可靠的組件。這里的可靠性指的是,只要消息被成功投遞到了消息中間件,它就不會(huì)丟失,至少能夠被消費(fèi)者成功消費(fèi)一次。這是消息中間件最基本的特性之一,也就是我們通常所說(shuō)的 “AT LEAST ONCE”,即消息至少會(huì)被成功消費(fèi)一遍。
舉個(gè)例子,假設(shè)一個(gè)消息M被發(fā)送到消息中間件并被消費(fèi)程序A接收到,A開(kāi)始消費(fèi)這個(gè)消息,但是在消費(fèi)過(guò)程中程序重啟了。由于這個(gè)消息沒(méi)有被標(biāo)記為已經(jīng)被消費(fèi)成功,消息中間件會(huì)持續(xù)地將這個(gè)消息投遞給消費(fèi)者,直到消息被成功消費(fèi)為止。
然而,這種可靠性特性也會(huì)導(dǎo)致消息被多次投遞的情況。舉個(gè)例子,仍然以之前的例子為例,如果消費(fèi)程序A接收并完成消息M的消費(fèi)邏輯后,正準(zhǔn)備通知消息中間件“我已經(jīng)消費(fèi)成功了”,但在此之前程序A又重啟了,那么對(duì)于消息中間件來(lái)說(shuō),這個(gè)消息M并沒(méi)有被成功消費(fèi)過(guò),因此消息中間件會(huì)繼續(xù)投遞這個(gè)消息。而對(duì)于消費(fèi)程序A來(lái)說(shuō),盡管它已經(jīng)成功消費(fèi)了這個(gè)消息,但由于程序重啟導(dǎo)致消息中間件繼續(xù)投遞,看起來(lái)就好像這個(gè)消息還沒(méi)有被成功消費(fèi)過(guò)一樣。
在 RockectMQ 的場(chǎng)景中,這意味著同一個(gè) messageId 的消息會(huì)被重復(fù)投遞。由于消息的可靠投遞是更重要的,所以避免消息重復(fù)投遞的任務(wù)轉(zhuǎn)移給了應(yīng)用程序自身來(lái)實(shí)現(xiàn)。這也是 RocketMQ 文檔強(qiáng)調(diào)消費(fèi)邏輯需要自行實(shí)現(xiàn)冪等性的原因。實(shí)際上,這背后的邏輯是:在分布式場(chǎng)景下,保證消息不丟和避免消息重復(fù)投遞是矛盾的,但是消息重復(fù)投遞是可以解決的,而消息丟失則非常麻煩。
冪等設(shè)計(jì)
讓我們先來(lái)了解一下郵件消息的發(fā)送流程,以便更好了解消息隊(duì)列冪等工作原理。

正如我們?cè)谥疤岬降模琑ocketMQ 遵循 "AT LEAST ONCE" 語(yǔ)義,這意味著消息可能會(huì)被重復(fù)消費(fèi)。在發(fā)送郵件消息的情況下,由于消息可能被重復(fù)消費(fèi),我們需要保證冪等性,以確保郵件不會(huì)被重復(fù)發(fā)送。
1. 消息發(fā)送邏輯
下面這塊代碼是 12306 支付結(jié)果回調(diào)訂單邏輯實(shí)現(xiàn),通過(guò) RocketMQMessageListener 監(jiān)聽(tīng)并消費(fèi) RocketMQ 消息。
JAVA復(fù)制代碼@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = OrderRocketMQConstant.PAY_GLOBAL_TOPIC_KEY,
selectorExpression = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_TAG_KEY,
consumerGroup = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_CG_KEY
)
public class PayResultCallbackOrderConsumer implements RocketMQListener<MessageWrApper<PayResultCallbackOrderEvent>> {
private final OrderService orderService;
@Transactional(rollbackFor = Exception.class)
@Override
public void onMessage(MessageWrapper<PayResultCallbackOrderEvent> message) {
PayResultCallbackOrderEvent payResultCallbackOrderEvent = message.getMessage();
OrderStatusReversalDTO orderStatusReversalDTO = OrderStatusReversalDTO.builder()
.orderSn(payResultCallbackOrderEvent.getOrderSn())
.orderStatus(OrderStatusEnum.ALREADY_PAID.getStatus())
.build();
orderService.statusReversal(orderStatusReversalDTO);
orderService.payCallbackOrder(payResultCallbackOrderEvent);
}
}
2. 冪等處理邏輯
下述方案的優(yōu)點(diǎn)在于,使用 redis 消息去重表,不依賴事務(wù),針對(duì)消息表本身做了狀態(tài)的區(qū)分:消費(fèi)中、消費(fèi)完成。
如果消息已經(jīng)在消費(fèi)中,拋出異常,消息會(huì)觸發(fā)延遲消費(fèi),在 RocketMQ 的場(chǎng)景下即發(fā)送到 RETRY TOPIC。

通過(guò)該方案可以解決什么問(wèn)題?
- 消息已經(jīng)消費(fèi)成功了,第二條消息將被直接冪等處理掉(消費(fèi)成功)。
- 并發(fā)場(chǎng)景下的消息,依舊能滿足不會(huì)出現(xiàn)消息重復(fù),即穿透冪等擋板的問(wèn)題。
- 支持上游業(yè)務(wù)生產(chǎn)者重發(fā)的業(yè)務(wù)重復(fù)的消息冪等問(wèn)題。
為什么要給初始化的冪等標(biāo)識(shí)新增 10 分鐘過(guò)期時(shí)間?
在并發(fā)場(chǎng)景下,我們使用消息狀態(tài)來(lái)實(shí)現(xiàn)并發(fā)控制,以使第二條消息被不斷延遲消費(fèi)(即重試)。但如果在此期間第一條消息也因某些異常原因(例如機(jī)器重啟或外部異常)未成功消費(fèi),該怎么辦呢?因?yàn)槊看尾樵儠r(shí)都會(huì)顯示消費(fèi)中的狀態(tài),所以延遲消費(fèi)會(huì)一直進(jìn)行下去,直到最終被視為消費(fèi)失敗并被投遞到死信 Topic 中(RocketMQ 默認(rèn)最多可以重復(fù)消費(fèi) 16 次)。
針對(duì)這個(gè)問(wèn)題,我們采取了一種解決方案:在插入消息表時(shí),必須為每條消息設(shè)置一個(gè)最長(zhǎng)消費(fèi)過(guò)期時(shí)間,例如 10 分鐘。這意味著,如果某個(gè)消息在消費(fèi)過(guò)程中超過(guò)了 10 分鐘,就會(huì)被視為消費(fèi)失敗并從消息表中刪除。
抽象冪等通用組件
為了解決消息隊(duì)列中的重復(fù)消費(fèi)問(wèn)題,我們可以設(shè)計(jì)一套通用的消息隊(duì)列冪等組件。這個(gè)組件可以被各個(gè)應(yīng)用程序使用,以確保它們的消費(fèi)邏輯是冪等的。這種通用的冪等組件可以使應(yīng)用程序不必為了解決重復(fù)消費(fèi)問(wèn)題而浪費(fèi)精力和時(shí)間,從而更專(zhuān)注于業(yè)務(wù)邏輯的實(shí)現(xiàn)。
在企業(yè)項(xiàng)目中,使用 MySQL 作為冪等去重表的情況比較少見(jiàn),因此在代碼中只提供了 Redis 實(shí)現(xiàn)方案。
1. 定義冪等注解
我們提供了一種通用的冪等注解,該注解可用于 RestAPI 和消息隊(duì)列消息防重復(fù)場(chǎng)景。
java復(fù)制代碼@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
/**
* 冪等Key,只有在 {@link Idempotent#type()} 為 {@link IdempotentTypeEnum#SPEL} 時(shí)生效
*/
String key() default "";
/**
* 觸發(fā)冪等失敗邏輯時(shí),返回的錯(cuò)誤提示信息
*/
String message() default "您操作太快,請(qǐng)稍后再試";
/**
* 驗(yàn)證冪等類(lèi)型,支持多種冪等方式
* RestAPI 建議使用 {@link IdempotentTypeEnum#TOKEN} 或 {@link IdempotentTypeEnum#PARAM}
* 其它類(lèi)型冪等驗(yàn)證,使用 {@link IdempotentTypeEnum#SPEL}
*/
IdempotentTypeEnum type() default IdempotentTypeEnum.PARAM;
/**
* 驗(yàn)證冪等場(chǎng)景,支持多種 {@link IdempotentSceneEnum}
*/
IdempotentSceneEnum scene() default IdempotentSceneEnum.RESTAPI;
/**
* 設(shè)置防重令牌 Key 前綴,MQ 冪等去重可選設(shè)置
* {@link IdempotentSceneEnum#MQ} and {@link IdempotentTypeEnum#SPEL} 時(shí)生效
*/
String uniqueKeyPrefix() default "";
/**
* 設(shè)置防重令牌 Key 過(guò)期時(shí)間,單位秒,默認(rèn) 1 小時(shí),MQ 冪等去重可選設(shè)置
* {@link IdempotentSceneEnum#MQ} and {@link IdempotentTypeEnum#SPEL} 時(shí)生效
*/
long keyTimeout() default 3600L;
}
為了方便理解,整理成思維導(dǎo)圖方便記憶。

2. 定義 AOP 增強(qiáng)
我們使用 AOP 技術(shù)為方法增強(qiáng)提供了通用的冪等性保證,只需要在需要保證冪等性的方法上添加 @Idempotent 注解,Aspect 就會(huì)對(duì)該方法進(jìn)行增強(qiáng)。
這種技術(shù)不僅適用于 RestAPI 場(chǎng)景,還適用于消息隊(duì)列的防重復(fù)消費(fèi)場(chǎng)景。
java復(fù)制代碼package org.opengoofy.index12306.framework.starter.idempotent.core;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.opengoofy.index12306.framework.starter.idempotent.annotation.Idempotent;
import java.lang.reflect.Method;
/**
* 冪等注解 AOP 攔截器
*
* @公眾號(hào):馬丁玩編程,回復(fù):加群,添加馬哥微信(備注:12306)獲取項(xiàng)目資料
*/
@Aspect
public final class IdempotentAspect {
/**
* 增強(qiáng)方法標(biāo)記 {@link Idempotent} 注解邏輯
*/
@Around("@annotation(org.opengoofy.index12306.framework.starter.idempotent.annotation.Idempotent)")
public Object idempotentHandler(ProceedingJoinPoint joinPoint) throws Throwable {
Idempotent idempotent = getIdempotent(joinPoint);
IdempotentExecuteHandler instance = IdempotentExecuteHandlerFactory.getInstance(idempotent.scene(), idempotent.type());
Object resultObj;
try {
instance.execute(joinPoint, idempotent);
resultObj = joinPoint.proceed();
instance.postProcessing();
} catch (RepeatConsumptionException ex) {
/**
* 觸發(fā)冪等邏輯時(shí)可能有兩種情況:
* * 1. 消息還在處理,但是不確定是否執(zhí)行成功,那么需要返回錯(cuò)誤,方便 RocketMQ 再次通過(guò)重試隊(duì)列投遞
* * 2. 消息處理成功了,該消息直接返回成功即可
*/
if (!ex.getError()) {
return null;
}
throw ex;
} catch (Throwable ex) {
// 客戶端消費(fèi)存在異常,需要?jiǎng)h除冪等標(biāo)識(shí)方便下次 RocketMQ 再次通過(guò)重試隊(duì)列投遞
instance.exceptionProcessing();
throw ex;
} finally {
IdempotentContext.clean();
}
return resultObj;
}
public static Idempotent getIdempotent(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
return targetMethod.getAnnotation(Idempotent.class);
}
}
這個(gè)方法的執(zhí)行邏輯與設(shè)計(jì)部分相同,因此在此處不再貼出具體的代碼。大家可以跟著設(shè)計(jì)閱讀冪等源碼。
為了提高通用性和抽象性,該組件采用了模板方法和簡(jiǎn)單工廠等設(shè)計(jì)模式,這有助于隔離復(fù)雜性和提高可擴(kuò)展性。如果您在學(xué)習(xí)過(guò)程中遇到問(wèn)題,歡迎在知識(shí)星球 APP 上向我提問(wèn)。
3. 實(shí)際場(chǎng)景使用
以實(shí)現(xiàn)支付結(jié)果回調(diào)訂單為例,我們可以將通用組件引入到消息消費(fèi)的邏輯中,具體流程如下:
java復(fù)制代碼@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = OrderRocketMQConstant.PAY_GLOBAL_TOPIC_KEY,
selectorExpression = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_TAG_KEY,
consumerGroup = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_CG_KEY
)
public class PayResultCallbackOrderConsumer implements RocketMQListener<MessageWrapper<PayResultCallbackOrderEvent>> {
private final OrderService orderService;
@Idempotent(
uniqueKeyPrefix = "index12306-order:pay_result_callback:",
key = "#message.getKeys()+'_'+#message.hashCode()",
type = IdempotentTypeEnum.SPEL,
scene = IdempotentSceneEnum.MQ,
keyTimeout = 7200L
)
@Transactional(rollbackFor = Exception.class)
@Override
public void onMessage(MessageWrapper<PayResultCallbackOrderEvent> message) {
PayResultCallbackOrderEvent payResultCallbackOrderEvent = message.getMessage();
OrderStatusReversalDTO orderStatusReversalDTO = OrderStatusReversalDTO.builder()
.orderSn(payResultCallbackOrderEvent.getOrderSn())
.orderStatus(OrderStatusEnum.ALREADY_PAID.getStatus())
.build();
orderService.statusReversal(orderStatusReversalDTO);
orderService.payCallbackOrder(payResultCallbackOrderEvent);
}
}
支持通過(guò) SpEL 表達(dá)式來(lái)充當(dāng)冪等去重表唯一鍵,通過(guò)一個(gè)簡(jiǎn)單的注解,完美解決消息隊(duì)列重復(fù)消費(fèi)問(wèn)題。
更復(fù)雜的冪等場(chǎng)景
到這里,方案看起來(lái)非常完美,所有的消息都可以快速接入去重,而且與具體業(yè)務(wù)實(shí)現(xiàn)完全解耦。但是,是否這樣就可以完美地完成去重的所有任務(wù)呢? 很遺憾,實(shí)際上并非如此。因?yàn)樾枰_保消息至少成功消費(fèi)一次,因此消息在消費(fèi)過(guò)程中有可能失敗并觸發(fā)重試。
還是以上面的例子,假設(shè)消息消費(fèi)的流程包含:
- 檢查庫(kù)存(RPC)
- 鎖庫(kù)存(RPC)
- 開(kāi)啟事務(wù),插入訂單表(MySQL)
- 調(diào)用某些其他下游服務(wù)(RPC)
- 更新訂單狀態(tài)
- commit 事務(wù)(MySQL)
當(dāng)消息消費(fèi)到第三步的時(shí)候假設(shè) MySQL 異常導(dǎo)致失敗了,觸發(fā)消息重試。在重試前我們會(huì)刪除冪等表的記錄,所以消息重試的時(shí)候就會(huì)重新進(jìn)入消費(fèi)代碼,那么步驟 1 和步驟 2 就會(huì)重新再執(zhí)行一遍。
如果步驟 2 本身不是冪等的,那么這個(gè)業(yè)務(wù)消息消費(fèi)依舊沒(méi)有做好完整的冪等處理。
1. 通用方法實(shí)現(xiàn)價(jià)值
盡管這種方式并不能完全解決消息冪等問(wèn)題(事實(shí)上,軟件工程領(lǐng)域里很少有銀彈可以完全解決問(wèn)題),但它仍然具有很大的價(jià)值。通過(guò)這種簡(jiǎn)便的方式,我們能夠解決以下問(wèn)題:
- 各種由于Broker、負(fù)載均衡等原因?qū)е碌南⒅赝哆f的重復(fù)問(wèn)題。
- 各種上游生產(chǎn)者導(dǎo)致的業(yè)務(wù)級(jí)別消息重復(fù)問(wèn)題。
- 重復(fù)消息并發(fā)消費(fèi)的控制窗口問(wèn)題,就算重復(fù),重復(fù)也不可能同一時(shí)間進(jìn)入消費(fèi)邏輯。
2. 消息去重的建議
使用這種方法可以確保在正常的消費(fèi)邏輯場(chǎng)景下(無(wú)異常,無(wú)異常退出),消息的冪等性全部得到解決,無(wú)論是業(yè)務(wù)重復(fù)還是 RocketMQ 特性帶來(lái)的重復(fù)。雖然它不是解決消息冪等性的銀彈,但它以一種簡(jiǎn)單和便捷的方式提供了解決方案。
實(shí)際上,這種方法已經(jīng)可以解決 99% 的消息重復(fù)問(wèn)題了,因?yàn)楫惓G闆r通常是少數(shù)情況。但是,如果希望在異常情況下也能處理好冪等問(wèn)題,可以采取以下措施來(lái)降低問(wèn)題發(fā)生的概率:
- 消息消費(fèi)失敗時(shí),應(yīng)該及時(shí)回滾處理。如果消息消費(fèi)失敗本身具備回滾機(jī)制,則消息重試也就沒(méi)有副作用了。
- 為了盡可能避免程序異常退出導(dǎo)致的消息重試,需要在消費(fèi)者代碼中做好優(yōu)雅退出處理。
- 針對(duì)一些無(wú)法做到完全冪等的操作,至少要做到終止消息的消費(fèi)并進(jìn)行告警。比如鎖定庫(kù)存的操作,如果通過(guò)業(yè)務(wù)流水號(hào)已經(jīng)成功鎖定了庫(kù)存,再次觸發(fā)鎖庫(kù)存操作的話,如果無(wú)法做到冪等性處理,那么至少要在消息消費(fèi)過(guò)程中觸發(fā)異常(如因主鍵沖突導(dǎo)致消費(fèi)異常等),并終止消息的消費(fèi),以避免重復(fù)消費(fèi)產(chǎn)生的副作用。
- 在 #3 做好的前提下,做好消息的消費(fèi)監(jiān)控,發(fā)現(xiàn)消息重試不斷失敗的時(shí)候,手動(dòng)做好 #1 的回滾,使得下次重試消費(fèi)成功。
文末總結(jié)
當(dāng)我們?cè)谑褂?RocketMQ 進(jìn)行消息處理時(shí),消息的冪等性是一個(gè)非常重要的問(wèn)題。本文通過(guò)抽象出通用組件的方式,實(shí)現(xiàn)了 RestAPI 和 RocketMQ 的冪等處理。 同時(shí),我們也發(fā)現(xiàn),冪等性并不是一個(gè)銀彈,不同的業(yè)務(wù)場(chǎng)景需要不同的冪等處理策略。
但是,通過(guò)一些基本的處理策略,如優(yōu)雅退出、回滾處理、消費(fèi)監(jiān)控等,我們能夠大大減少消息重復(fù)的問(wèn)題,提高消息消費(fèi)的穩(wěn)定性和可靠性。 在實(shí)際開(kāi)發(fā)中,需要結(jié)合具體業(yè)務(wù)場(chǎng)景,選擇合適的冪等處理策略,并且在每次新的場(chǎng)景出現(xiàn)時(shí),都需要仔細(xì)考慮是否需要重新審視冪等性的處理方式。
上文中的代碼以及實(shí)現(xiàn)已在基礎(chǔ)架構(gòu)模塊中定義,詳情查看。

原文鏈接:
https://juejin.cn/post/7270139990696722484