日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

問(wèn)題背景

當(dāng)使用消息隊(duì)列時(shí),客戶端重復(fù)消費(fèi)可能會(huì)成為一個(gè)嚴(yán)重的問(wèn)題。 這是因?yàn)橄㈥?duì)列具有持久性和可靠性的特性,確保消息能夠被成功傳遞給消費(fèi)者。然而,這也會(huì)導(dǎo)致客戶端在某些情況下重復(fù)消費(fèi)消息,例如網(wǎng)絡(luò)故障、客戶端崩潰、消息處理失敗等情況。

為了避免這種情況發(fā)生,需要在客戶端實(shí)現(xiàn)一些機(jī)制來(lái)確保消息不會(huì)被重復(fù)消費(fèi),例如記錄消費(fèi)者已經(jīng)處理的消息 ID、使用分布式鎖來(lái)控制消費(fèi)進(jìn)程的唯一性等。這些機(jī)制能夠保證消息被成功處理,同時(shí)也能夠提高系統(tǒng)的可靠性和穩(wěn)定性。

今天的文章我們將探討如何確保消息隊(duì)列中的消息不會(huì)被重復(fù)消費(fèi),下文將以 RocketMQ 為例說(shuō)明。

消息冪等性

消息中間件是分布式系統(tǒng)中常用的組件,它具有廣泛的應(yīng)用價(jià)值,例如實(shí)現(xiàn)異步化、解耦、削峰等功能。通常情況下,我們認(rèn)為消息中間件是一個(gè)可靠的組件。這里的可靠性指的是,只要消息被成功投遞到了消息中間件,它就不會(huì)丟失,至少能夠被消費(fèi)者成功消費(fèi)一次。這是消息中間件最基本的特性之一,也就是我們通常所說(shuō)的 “AT LEAST ONCE”,即消息至少會(huì)被成功消費(fèi)一遍。

舉個(gè)例子,假設(shè)一個(gè)消息M被發(fā)送到消息中間件并被消費(fèi)程序A接收到,A開(kāi)始消費(fèi)這個(gè)消息,但是在消費(fèi)過(guò)程中程序重啟了。由于這個(gè)消息沒(méi)有被標(biāo)記為已經(jīng)被消費(fèi)成功,消息中間件會(huì)持續(xù)地將這個(gè)消息投遞給消費(fèi)者,直到消息被成功消費(fèi)為止。

然而,這種可靠性特性也會(huì)導(dǎo)致消息被多次投遞的情況。舉個(gè)例子,仍然以之前的例子為例,如果消費(fèi)程序A接收并完成消息M的消費(fèi)邏輯后,正準(zhǔn)備通知消息中間件“我已經(jīng)消費(fèi)成功了”,但在此之前程序A又重啟了,那么對(duì)于消息中間件來(lái)說(shuō),這個(gè)消息M并沒(méi)有被成功消費(fèi)過(guò),因此消息中間件會(huì)繼續(xù)投遞這個(gè)消息。而對(duì)于消費(fèi)程序A來(lái)說(shuō),盡管它已經(jīng)成功消費(fèi)了這個(gè)消息,但由于程序重啟導(dǎo)致消息中間件繼續(xù)投遞,看起來(lái)就好像這個(gè)消息還沒(méi)有被成功消費(fèi)過(guò)一樣。

在 RockectMQ 的場(chǎng)景中,這意味著同一個(gè) messageId 的消息會(huì)被重復(fù)投遞。由于消息的可靠投遞是更重要的,所以避免消息重復(fù)投遞的任務(wù)轉(zhuǎn)移給了應(yīng)用程序自身來(lái)實(shí)現(xiàn)。這也是 RocketMQ 文檔強(qiáng)調(diào)消費(fèi)邏輯需要自行實(shí)現(xiàn)冪等性的原因。實(shí)際上,這背后的邏輯是:在分布式場(chǎng)景下,保證消息不丟和避免消息重復(fù)投遞是矛盾的,但是消息重復(fù)投遞是可以解決的,而消息丟失則非常麻煩。

冪等設(shè)計(jì)

讓我們先來(lái)了解一下郵件消息的發(fā)送流程,以便更好了解消息隊(duì)列冪等工作原理。

 

正如我們?cè)谥疤岬降模琑ocketMQ 遵循 "AT LEAST ONCE" 語(yǔ)義,這意味著消息可能會(huì)被重復(fù)消費(fèi)。在發(fā)送郵件消息的情況下,由于消息可能被重復(fù)消費(fèi),我們需要保證冪等性,以確保郵件不會(huì)被重復(fù)發(fā)送。

1. 消息發(fā)送邏輯

下面這塊代碼是 12306 支付結(jié)果回調(diào)訂單邏輯實(shí)現(xiàn),通過(guò) RocketMQMessageListener 監(jiān)聽(tīng)并消費(fèi) RocketMQ 消息。

JAVA復(fù)制代碼@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = OrderRocketMQConstant.PAY_GLOBAL_TOPIC_KEY,
        selectorExpression = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_TAG_KEY,
        consumerGroup = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_CG_KEY
)
public class PayResultCallbackOrderConsumer implements RocketMQListener<MessageWrApper<PayResultCallbackOrderEvent>> {

    private final OrderService orderService;

    
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void onMessage(MessageWrapper<PayResultCallbackOrderEvent> message) {
        PayResultCallbackOrderEvent payResultCallbackOrderEvent = message.getMessage();
        OrderStatusReversalDTO orderStatusReversalDTO = OrderStatusReversalDTO.builder()
                .orderSn(payResultCallbackOrderEvent.getOrderSn())
                .orderStatus(OrderStatusEnum.ALREADY_PAID.getStatus())
                .build();
        orderService.statusReversal(orderStatusReversalDTO);
        orderService.payCallbackOrder(payResultCallbackOrderEvent);
    }
}

2. 冪等處理邏輯

下述方案的優(yōu)點(diǎn)在于,使用 redis 消息去重表,不依賴事務(wù),針對(duì)消息表本身做了狀態(tài)的區(qū)分:消費(fèi)中、消費(fèi)完成。

如果消息已經(jīng)在消費(fèi)中,拋出異常,消息會(huì)觸發(fā)延遲消費(fèi),在 RocketMQ 的場(chǎng)景下即發(fā)送到 RETRY TOPIC。

 

通過(guò)該方案可以解決什么問(wèn)題?

  1. 消息已經(jīng)消費(fèi)成功了,第二條消息將被直接冪等處理掉(消費(fèi)成功)。
  2. 并發(fā)場(chǎng)景下的消息,依舊能滿足不會(huì)出現(xiàn)消息重復(fù),即穿透冪等擋板的問(wèn)題。
  3. 支持上游業(yè)務(wù)生產(chǎn)者重發(fā)的業(yè)務(wù)重復(fù)的消息冪等問(wèn)題。

為什么要給初始化的冪等標(biāo)識(shí)新增 10 分鐘過(guò)期時(shí)間?

在并發(fā)場(chǎng)景下,我們使用消息狀態(tài)來(lái)實(shí)現(xiàn)并發(fā)控制,以使第二條消息被不斷延遲消費(fèi)(即重試)。但如果在此期間第一條消息也因某些異常原因(例如機(jī)器重啟或外部異常)未成功消費(fèi),該怎么辦呢?因?yàn)槊看尾樵儠r(shí)都會(huì)顯示消費(fèi)中的狀態(tài),所以延遲消費(fèi)會(huì)一直進(jìn)行下去,直到最終被視為消費(fèi)失敗并被投遞到死信 Topic 中(RocketMQ 默認(rèn)最多可以重復(fù)消費(fèi) 16 次)。

針對(duì)這個(gè)問(wèn)題,我們采取了一種解決方案:在插入消息表時(shí),必須為每條消息設(shè)置一個(gè)最長(zhǎng)消費(fèi)過(guò)期時(shí)間,例如 10 分鐘。這意味著,如果某個(gè)消息在消費(fèi)過(guò)程中超過(guò)了 10 分鐘,就會(huì)被視為消費(fèi)失敗并從消息表中刪除。

抽象冪等通用組件

為了解決消息隊(duì)列中的重復(fù)消費(fèi)問(wèn)題,我們可以設(shè)計(jì)一套通用的消息隊(duì)列冪等組件。這個(gè)組件可以被各個(gè)應(yīng)用程序使用,以確保它們的消費(fèi)邏輯是冪等的。這種通用的冪等組件可以使應(yīng)用程序不必為了解決重復(fù)消費(fèi)問(wèn)題而浪費(fèi)精力和時(shí)間,從而更專(zhuān)注于業(yè)務(wù)邏輯的實(shí)現(xiàn)。

在企業(yè)項(xiàng)目中,使用 MySQL 作為冪等去重表的情況比較少見(jiàn),因此在代碼中只提供了 Redis 實(shí)現(xiàn)方案。

1. 定義冪等注解

我們提供了一種通用的冪等注解,該注解可用于 RestAPI 和消息隊(duì)列消息防重復(fù)場(chǎng)景。

java復(fù)制代碼@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
    
    /**
     * 冪等Key,只有在 {@link Idempotent#type()} 為 {@link IdempotentTypeEnum#SPEL} 時(shí)生效
     */
    String key() default "";
    
    /**
     * 觸發(fā)冪等失敗邏輯時(shí),返回的錯(cuò)誤提示信息
     */
    String message() default "您操作太快,請(qǐng)稍后再試";
    
    /**
     * 驗(yàn)證冪等類(lèi)型,支持多種冪等方式
     * RestAPI 建議使用 {@link IdempotentTypeEnum#TOKEN} 或 {@link IdempotentTypeEnum#PARAM}
     * 其它類(lèi)型冪等驗(yàn)證,使用 {@link IdempotentTypeEnum#SPEL}
     */
    IdempotentTypeEnum type() default IdempotentTypeEnum.PARAM;
    
    /**
     * 驗(yàn)證冪等場(chǎng)景,支持多種 {@link IdempotentSceneEnum}
     */
    IdempotentSceneEnum scene() default IdempotentSceneEnum.RESTAPI;
    
    /**
     * 設(shè)置防重令牌 Key 前綴,MQ 冪等去重可選設(shè)置
     * {@link IdempotentSceneEnum#MQ} and {@link IdempotentTypeEnum#SPEL} 時(shí)生效
     */
    String uniqueKeyPrefix() default "";
    
    /**
     * 設(shè)置防重令牌 Key 過(guò)期時(shí)間,單位秒,默認(rèn) 1 小時(shí),MQ 冪等去重可選設(shè)置
     * {@link IdempotentSceneEnum#MQ} and {@link IdempotentTypeEnum#SPEL} 時(shí)生效
     */
    long keyTimeout() default 3600L;
}

為了方便理解,整理成思維導(dǎo)圖方便記憶。

 

2. 定義 AOP 增強(qiáng)

我們使用 AOP 技術(shù)為方法增強(qiáng)提供了通用的冪等性保證,只需要在需要保證冪等性的方法上添加 @Idempotent 注解,Aspect 就會(huì)對(duì)該方法進(jìn)行增強(qiáng)。

這種技術(shù)不僅適用于 RestAPI 場(chǎng)景,還適用于消息隊(duì)列的防重復(fù)消費(fèi)場(chǎng)景。

java復(fù)制代碼package org.opengoofy.index12306.framework.starter.idempotent.core;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.opengoofy.index12306.framework.starter.idempotent.annotation.Idempotent;

import java.lang.reflect.Method;

/**
 * 冪等注解 AOP 攔截器
 *
 * @公眾號(hào):馬丁玩編程,回復(fù):加群,添加馬哥微信(備注:12306)獲取項(xiàng)目資料
 */
@Aspect
public final class IdempotentAspect {

    /**
     * 增強(qiáng)方法標(biāo)記 {@link Idempotent} 注解邏輯
     */
    @Around("@annotation(org.opengoofy.index12306.framework.starter.idempotent.annotation.Idempotent)")
    public Object idempotentHandler(ProceedingJoinPoint joinPoint) throws Throwable {
        Idempotent idempotent = getIdempotent(joinPoint);
        IdempotentExecuteHandler instance = IdempotentExecuteHandlerFactory.getInstance(idempotent.scene(), idempotent.type());
        Object resultObj;
        try {
            instance.execute(joinPoint, idempotent);
            resultObj = joinPoint.proceed();
            instance.postProcessing();
        } catch (RepeatConsumptionException ex) {
            /**
             * 觸發(fā)冪等邏輯時(shí)可能有兩種情況:
             *    * 1. 消息還在處理,但是不確定是否執(zhí)行成功,那么需要返回錯(cuò)誤,方便 RocketMQ 再次通過(guò)重試隊(duì)列投遞
             *    * 2. 消息處理成功了,該消息直接返回成功即可
             */
            if (!ex.getError()) {
                return null;
            }
            throw ex;
        } catch (Throwable ex) {
            // 客戶端消費(fèi)存在異常,需要?jiǎng)h除冪等標(biāo)識(shí)方便下次 RocketMQ 再次通過(guò)重試隊(duì)列投遞
            instance.exceptionProcessing();
            throw ex;
        } finally {
            IdempotentContext.clean();
        }
        return resultObj;
    }

    public static Idempotent getIdempotent(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
        return targetMethod.getAnnotation(Idempotent.class);
    }
}

這個(gè)方法的執(zhí)行邏輯與設(shè)計(jì)部分相同,因此在此處不再貼出具體的代碼。大家可以跟著設(shè)計(jì)閱讀冪等源碼。

為了提高通用性和抽象性,該組件采用了模板方法和簡(jiǎn)單工廠等設(shè)計(jì)模式,這有助于隔離復(fù)雜性和提高可擴(kuò)展性。如果您在學(xué)習(xí)過(guò)程中遇到問(wèn)題,歡迎在知識(shí)星球 APP 上向我提問(wèn)。

3. 實(shí)際場(chǎng)景使用

以實(shí)現(xiàn)支付結(jié)果回調(diào)訂單為例,我們可以將通用組件引入到消息消費(fèi)的邏輯中,具體流程如下:

java復(fù)制代碼@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = OrderRocketMQConstant.PAY_GLOBAL_TOPIC_KEY,
        selectorExpression = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_TAG_KEY,
        consumerGroup = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_CG_KEY
)
public class PayResultCallbackOrderConsumer implements RocketMQListener<MessageWrapper<PayResultCallbackOrderEvent>> {

    private final OrderService orderService;

    @Idempotent(
            uniqueKeyPrefix = "index12306-order:pay_result_callback:",
            key = "#message.getKeys()+'_'+#message.hashCode()",
            type = IdempotentTypeEnum.SPEL,
            scene = IdempotentSceneEnum.MQ,
            keyTimeout = 7200L
    )
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void onMessage(MessageWrapper<PayResultCallbackOrderEvent> message) {
        PayResultCallbackOrderEvent payResultCallbackOrderEvent = message.getMessage();
        OrderStatusReversalDTO orderStatusReversalDTO = OrderStatusReversalDTO.builder()
                .orderSn(payResultCallbackOrderEvent.getOrderSn())
                .orderStatus(OrderStatusEnum.ALREADY_PAID.getStatus())
                .build();
        orderService.statusReversal(orderStatusReversalDTO);
        orderService.payCallbackOrder(payResultCallbackOrderEvent);
    }
}

支持通過(guò) SpEL 表達(dá)式來(lái)充當(dāng)冪等去重表唯一鍵,通過(guò)一個(gè)簡(jiǎn)單的注解,完美解決消息隊(duì)列重復(fù)消費(fèi)問(wèn)題。

更復(fù)雜的冪等場(chǎng)景

到這里,方案看起來(lái)非常完美,所有的消息都可以快速接入去重,而且與具體業(yè)務(wù)實(shí)現(xiàn)完全解耦。但是,是否這樣就可以完美地完成去重的所有任務(wù)呢? 很遺憾,實(shí)際上并非如此。因?yàn)樾枰_保消息至少成功消費(fèi)一次,因此消息在消費(fèi)過(guò)程中有可能失敗并觸發(fā)重試。

還是以上面的例子,假設(shè)消息消費(fèi)的流程包含:

  1. 檢查庫(kù)存(RPC)
  2. 鎖庫(kù)存(RPC)
  3. 開(kāi)啟事務(wù),插入訂單表(MySQL)
  4. 調(diào)用某些其他下游服務(wù)(RPC)
  5. 更新訂單狀態(tài)
  6. commit 事務(wù)(MySQL)

當(dāng)消息消費(fèi)到第三步的時(shí)候假設(shè) MySQL 異常導(dǎo)致失敗了,觸發(fā)消息重試。在重試前我們會(huì)刪除冪等表的記錄,所以消息重試的時(shí)候就會(huì)重新進(jìn)入消費(fèi)代碼,那么步驟 1 和步驟 2 就會(huì)重新再執(zhí)行一遍。

如果步驟 2 本身不是冪等的,那么這個(gè)業(yè)務(wù)消息消費(fèi)依舊沒(méi)有做好完整的冪等處理。

1. 通用方法實(shí)現(xiàn)價(jià)值

盡管這種方式并不能完全解決消息冪等問(wèn)題(事實(shí)上,軟件工程領(lǐng)域里很少有銀彈可以完全解決問(wèn)題),但它仍然具有很大的價(jià)值。通過(guò)這種簡(jiǎn)便的方式,我們能夠解決以下問(wèn)題:

  1. 各種由于Broker、負(fù)載均衡等原因?qū)е碌南⒅赝哆f的重復(fù)問(wèn)題。
  2. 各種上游生產(chǎn)者導(dǎo)致的業(yè)務(wù)級(jí)別消息重復(fù)問(wèn)題。
  3. 重復(fù)消息并發(fā)消費(fèi)的控制窗口問(wèn)題,就算重復(fù),重復(fù)也不可能同一時(shí)間進(jìn)入消費(fèi)邏輯。

2. 消息去重的建議

使用這種方法可以確保在正常的消費(fèi)邏輯場(chǎng)景下(無(wú)異常,無(wú)異常退出),消息的冪等性全部得到解決,無(wú)論是業(yè)務(wù)重復(fù)還是 RocketMQ 特性帶來(lái)的重復(fù)。雖然它不是解決消息冪等性的銀彈,但它以一種簡(jiǎn)單和便捷的方式提供了解決方案。

實(shí)際上,這種方法已經(jīng)可以解決 99% 的消息重復(fù)問(wèn)題了,因?yàn)楫惓G闆r通常是少數(shù)情況。但是,如果希望在異常情況下也能處理好冪等問(wèn)題,可以采取以下措施來(lái)降低問(wèn)題發(fā)生的概率:

  1. 消息消費(fèi)失敗時(shí),應(yīng)該及時(shí)回滾處理。如果消息消費(fèi)失敗本身具備回滾機(jī)制,則消息重試也就沒(méi)有副作用了。
  2. 為了盡可能避免程序異常退出導(dǎo)致的消息重試,需要在消費(fèi)者代碼中做好優(yōu)雅退出處理。
  3. 針對(duì)一些無(wú)法做到完全冪等的操作,至少要做到終止消息的消費(fèi)并進(jìn)行告警。比如鎖定庫(kù)存的操作,如果通過(guò)業(yè)務(wù)流水號(hào)已經(jīng)成功鎖定了庫(kù)存,再次觸發(fā)鎖庫(kù)存操作的話,如果無(wú)法做到冪等性處理,那么至少要在消息消費(fèi)過(guò)程中觸發(fā)異常(如因主鍵沖突導(dǎo)致消費(fèi)異常等),并終止消息的消費(fèi),以避免重復(fù)消費(fèi)產(chǎn)生的副作用。
  4. 在 #3 做好的前提下,做好消息的消費(fèi)監(jiān)控,發(fā)現(xiàn)消息重試不斷失敗的時(shí)候,手動(dòng)做好 #1 的回滾,使得下次重試消費(fèi)成功。

文末總結(jié)

當(dāng)我們?cè)谑褂?RocketMQ 進(jìn)行消息處理時(shí),消息的冪等性是一個(gè)非常重要的問(wèn)題。本文通過(guò)抽象出通用組件的方式,實(shí)現(xiàn)了 RestAPI 和 RocketMQ 的冪等處理。 同時(shí),我們也發(fā)現(xiàn),冪等性并不是一個(gè)銀彈,不同的業(yè)務(wù)場(chǎng)景需要不同的冪等處理策略。

但是,通過(guò)一些基本的處理策略,如優(yōu)雅退出、回滾處理、消費(fèi)監(jiān)控等,我們能夠大大減少消息重復(fù)的問(wèn)題,提高消息消費(fèi)的穩(wěn)定性和可靠性。 在實(shí)際開(kāi)發(fā)中,需要結(jié)合具體業(yè)務(wù)場(chǎng)景,選擇合適的冪等處理策略,并且在每次新的場(chǎng)景出現(xiàn)時(shí),都需要仔細(xì)考慮是否需要重新審視冪等性的處理方式。

上文中的代碼以及實(shí)現(xiàn)已在基礎(chǔ)架構(gòu)模塊中定義,詳情查看。

 


原文鏈接:
https://juejin.cn/post/7270139990696722484

分享到:
標(biāo)簽:隊(duì)列 消息
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定