日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

當一個接口從簡單的內部調用升級為遠程方法調用(RPC)會面臨很多問題,比如:

  1. 本地事務失效。在內部調用時,多個方法通常在同一事務中執行,可以使用本地數據庫事務來確保數據的一致性。但是,在遠程方法調用中,由于涉及到網絡通信,事務的邊界會擴展到多個系統之間,因此無法直接使用本地事務。如果遠程方法調用出現異常,可能會導致事務提交失敗,從而產生數據不一致;
  2. 第三狀態影響。網絡不確定性可能導致遠程調用無法成功獲得結果,例如網絡連接中斷、網絡超時等。在這種情況下,客戶端無法獲得期望的結果,調用會以網絡錯誤或超時的方式結束;
  3. 服務版本兼容性問題。如果服務接口發生變化,客戶端和服務端的版本不匹配可能導致調用失??;
  4. 性能問題、可用性問題、安全問題等;

由于涉及的問題比較多,這里重點分析和解決 RPC 調用時的第三狀態問題。

1. 什么是第三狀態

當一個客戶端發起一個RPC請求時,服務端可能會返回不同的狀態,包括:

  1. 成功:服務端成功完成了客戶端發送的請求,并返回對應的響應結果;
  2. 失?。悍斩藷o法成功處理客戶端發送的請求,并返回錯誤信息或異常;
  3. 超時:調用方無法在規定時間收到服務端的處理結果,所以無法知道請求的最終處理結果;

如下圖所示:

DDD與微服務集成的第一戰役:客戶端重試&服務端冪等圖片

一般情況下,調用方在規定時間收到被調用方的返回結果,能夠非常明確的知道處理結果是成功還是失敗。

當網絡或被調用方出問題,就會觸發超時,比如下圖所示:

DDD與微服務集成的第一戰役:客戶端重試&服務端冪等圖片

如果被調用方異?;蛘呔W絡發生阻塞,調用方發送的 Request 請求沒有被正常處理,那調用方只能在等待若干時間后拋出異常進行流程中斷。

又或者如下圖所示:

DDD與微服務集成的第一戰役:客戶端重試&服務端冪等圖片

被調用方處理時間過長或者網絡發生阻塞,調用方無法在規定時間獲得最終結果,也只能觸發超時中斷。

可見,如果發生 超時 情況,對于處理結果就處于未知狀態,這就是所謂的“第三狀態”:

  1. 被調用方成功接收到請求并完成了處理;
  2. 被調用方完全沒有接收到請求;

在出現第三狀態時,在不做任何處理前,根本就無法獲取最終的處理結果。在該場景下,最通用的解決方案便是 客戶端重試 + 服務端冪等。

  1. 客戶端重試。指的是在RPC調用出現超時的情況下,客戶端自動重新發送相同的請求。然而,在客戶端重試時需要注意避免重復執行有副作用的操作,比如避免重復插入數據;
  2. 服務端冪等。指的是對于相同的輸入請求,服務端能夠產生相同的結果,而且不會對系統狀態造成影響。冪等性是為了應對由于重試等原因導致重復執行的副作用;

通過客戶端重試和服務端冪等的方式,可以增加RPC調用的可靠性和數據一致性。客戶端重試可以處理網絡不穩定、服務端故障等導致的失敗情況,而服務端冪等性能保證相同的請求不會重復執行或引起數據不一致的問題。這兩個技術結合使用,可以提高分布式系統中RPC調用的健壯性和可靠性。

2. 客戶端重試

客戶端重試指的是在RPC調用失敗的情況下,客戶端自動重新發送相同的請求。客戶端可以設置重試的次數和時間間隔,直到得到預期的成功響應或達到最大重試次數??蛻舳酥卦嚈C制可以彌補網絡不穩定性或服務端異常導致的調用失敗。然而,在客戶端重試時需要注意避免重復執行有副作用的操作,比如避免重復插入數據。此外,還需要合理設置重試策略,避免因頻繁的重試導致網絡負荷增加或服務端壓力過大。

如下圖所示:

DDD與微服務集成的第一戰役:客戶端重試&服務端冪等圖片

  1. 第一次獲取商品信息時,由于網絡異常導致請求超時;
  2. 網絡異常觸發 retry 機制,重新發起新的請求;
  3. 新請求成功發送至商品服務并獲取信息,從而使得流程從異常中恢復,最終正常執行完成;

由此可見,重試的工作原理非常簡單。

Spring Retry是Spring Framework提供的一個模塊,用于簡化和增強應用程序中的重試操作。它提供了一種聲明式的方式來處理方法調用的重試,以應對在分布式系統或有限資源環境下可能出現的失敗情況。

Spring Retry模塊的主要特性包括:

  1. 聲明式注解:通過使用注解(例如@Retryable、@Recover等),可以將重試行為與方法關聯起來。通過在方法上添加@Retryable注解,可以指定需要進行重試的異常類型,最大重試次數,重試間隔等信息;
  2. 容錯策略:Spring Retry提供了多種容錯策略,包括簡單重試、指數退避重試、固定時間間隔重試等,開發者可以根據具體需求選擇合適的策略;
  3. 回退機制:如果重試次數超過限定值仍然失敗,Spring Retry可以通過@Recover注解來指定一個回退方法,用于執行備選邏輯或返回默認值;

要使用Spring Retry,需要在項目中引入相應的依賴:

<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
</dependency>

2.1. Retryable

重試行為主要是由 @Retryable 注解完成,通過為目標方法添加注解,將其納入重試管理,Spring Retry將負責在出現失敗情況時自動進行重試。

以下是 @Retryable 注解代碼示例:

@Service
public class MyService {

    @Retryable(value = {SomeException.class}, maxAttempts = 3)
    public void doSomething() {
        // 需要進行重試的業務邏輯
    }
}

上述示例中的 doSomething() 方法標記為需要進行重試,在捕獲到 SomeException 異常時觸發重試,最多重試3次。

下表列出了 @Retryable 注解的所有屬性及其說明:

屬性名

說明

value

定義重試的異常類型。默認情況下,將捕獲所有Throwable類型的異常進行重試??梢酝ㄟ^指定一個或多個異常類來限定重試的異常類型,例如@Retryable(value = {SQLException.class, TimeoutException.class})。

maxAttempts

定義最大重試次數,默認為3次。當達到最大重試次數后,如果仍然失敗,則不再進行重試。

backoff

定義重試間隔策略??梢允褂?code>@Backoff注解來配置重試間隔的退避策略,包括delay(初始延遲時間,默認為0)和maxDelay(最大延遲時間,默認為無限制)。例如,@Backoff(delay = 1000, maxDelay = 5000)表示初始延遲為1秒,最大延遲為5秒。

delayExpression

定義重試間隔的SpEL表達式。可以使用#lastException表示上一次重試時捕獲的異常對象。

multiplier

定義指數退避策略的倍數,默認為1。在使用指數退避策略時,每次重試的間隔時間將是delay * (multiplier ^ (重試次數-1))。

random

定義是否啟用隨機化延遲。默認為false,即不啟用隨機化延遲。當設置為true時,重試間隔將隨機波動一定的范圍。

exceptionExpression

定義一個SpEL表達式,用于決定是否進行重試。該表達式可以使用#result表示目標方法的返回值,#lastException表示上一次重試時捕獲的異常對象。如果表達式的結果為true,則進行重試;否則,不進行重試。

include

定義需要包含在重試行為中的異常類型,默認為空數組,表示包括所有異常類型??梢灾付ㄒ粋€或多個異常類來明確指定需要處理的異常類型。例如,@Retryable(include = {IOException.class, TimeoutException.class})表示僅包括IOExceptionTimeoutException異常。

exclude

定義需要排除在重試行為之外的異常類型,默認為空數組,表示排除所有異常類型??梢灾付ㄒ粋€或多個異常類來明確排除某些異常類型。例如,@Retryable(exclude = {NullPointerException.class})表示排除NullPointerException異常。

這些屬性可以根據具體的需求進行配置,以實現靈活的重試策略。

2.2. Recover

Spring Retry 的 fallback 機制是在重試失敗后,執行備選邏輯的一種處理方式。通過使用 @Recover 注解,在重試失敗后,可以調用備選邏輯方法來完成錯誤處理、數據清理等操作。

具體來說,當 @Retryable 注解標記的方法達到最大重試次數或者拋出了無法重試的異常時,Spring Retry將會嘗試查找與該方法參數類型相同的方法,并在找到時調用它。

@Retryable(maxAttempts = 3)
public void someMethod(String arg1, int arg2) {
    // 重試業務邏輯
}

@Recover
public void recover(String arg1, int arg2) {
    // 備選邏輯
}

當 someMethod() 方法達到最大重試次數或拋出無法重試的異常時,Spring Retry 將會查找并調用 recover() 方法,并傳遞相同的方法參數。在 recover() 方法中,應該針對重試失敗的情況編寫備選邏輯,例如記錄日志、發送通知等操作。

需要注意的是,@Recover 注解必須放置在其對應的重試方法所屬的類中,并且方法參數類型必須與重試方法一致。如果有多個重試方法,每個重試方法都可以對應一個@Recover方法,以滿足不同的備選邏輯需求。

在使用fallback機制時,應該仔細考慮備選邏輯的實現方式,確保其能夠正確處理重試失敗的情況,并對數據的一致性和完整性產生最小的影響。

2.3. 不同場景下的 Retry 和 Fallback

@Retryable 和 @Recover 都是添加在類方法上的注解,不管是什么場景下的請求只會走固定流程。這樣的設計在復雜場景下是否夠用?

之前,我們看到通過 Retry 恢復網絡抖動的場景;接下來讓我們看另一個場景,如下圖所示:

DDD與微服務集成的第一戰役:客戶端重試&服務端冪等圖片

image

商品服務流量激增,導致 DB CPU 飆升,出現大量的慢 SQL,這時觸發了系統的 Retry 會是怎樣的結果?

  1. 在獲取商品失敗后,系統自動觸發 Retry 機制;
  2. 由于是商品服務本身出了問題,第二次請求仍舊失敗;
  3. 服務又觸發了第三次請求,仍未獲取結果;
  4. 達到最大重試次數,仍舊無法獲取商品,只能通過異常中斷用戶請求;

通過 Retry 機制未能將流程從異常中恢復過來,反而給下游的 商品服務 造成了巨大傷害。

  1. 商品服務壓力大,響應時間長;
  2. 上游系統由于超時觸發自動重試;
  3. 自動重試增大了對商品服務的調用;
  4. 商品服務請求量更大,更難以從故障中恢復;

這就是常說的“讀放大”,假設用戶驗證是否能夠購買請求的請求量為 n,那極端情況下 商品服務的請求量為 3n (其中 2n 是由 Retry 機制造成)

此時,最優解不是進行 Retry 而是直接走 Fallback,給下游服務一定的恢復機會。

同樣是對商品服務接口(同一個接口)的調用,在不同的場景需要使用不同的策略用以適配不同的業務流程,通常情況下:

  1. Command 場景優先使用 Retry 策略

這種流量即為重要,最好能保障流程的完整性

通常寫流量比較小,小范圍 Retry 不會對下游系統造成巨大影響

  1. Query 場景優選使用 Fallback 策略
  • 大多數展示場景,哪怕部分信息沒有獲取到對整體的影響也比較小

  • 通常讀場景流量較高,Retry 對下游系統的傷害不容忽視

面對不同的業務場景,你會怎么做呢?準備兩組不同的方法根據業務場景分別調用?

2.4. SmartFAIlure

SmartFailure 不是為不同的場景使用不同的方法,而是根據請求上下文信息,動態的走 Retry 或 Fallback,從而更好的適應復雜的業務場景。

整體設計如下:

DDD與微服務集成的第一戰役:客戶端重試&服務端冪等圖片

整體流程如下:

  1. 讀取配置信息,將請求類型(ActionType)綁定到線程上下文;
  2. 然后執行正常業務邏輯
  3. 當調用 @SmartFault 注解的方法時,會被 SmartFaultMethodInterceptor 攔截器攔截

攔截器通過 ActionTypeProvider 獲取當前的 ActionType;

根據 ActionType 對請求進行路由;

如果是 COMMAND 操作,將使用 RetryTemplate 執行請求,在發生異常時,通過重試配置進行請求重發,從而最大限度的獲得遠程結果;

如果是 QUERY 操作,將使用 FallbackTemplate(重試次數為0的 RetryTemplate)執行請求,當發生異常時,調用 fallback 方法,執行配置的 recover 方法,直接使用返回結果;

  1. 獲取遠程結果后,執行后續的業務邏輯;
  2. 最后,ActionAspect 將 ActionType 從線程上下文中移除;

使用前需添加 lego 依賴,具體如下:

<dependency>
    <groupId>com.geekhalo.lego</groupId>
    <artifactId>lego-starter</artifactId>
    <version>最新版本</version>
</dependency>
2.4.1. ActionTypeProvider

首先,需要準備一個 ActionTypeProvider 用以提供上下文信息。ActionTypeProvider 接口定義如下:

public interface ActionTypeProvider {
    ActionType get();
}

public enum ActionType {
    COMMAND, QUERY
}

通常情況下,我們使用 ThreadLocal 組件將 ActionType 存儲于線程上下文,在使用時從上下中獲取相關信息。

public class ActionContext {
    private static final ThreadLocal<ActionType> ACTION_TYPE_THREAD_LOCAL = new ThreadLocal<>();

    public static void set(ActionType actionType){
        ACTION_TYPE_THREAD_LOCAL.set(actionType);
    }

    public static ActionType get(){
        return ACTION_TYPE_THREAD_LOCAL.get();
    }

    public static void clear(){
        ACTION_TYPE_THREAD_LOCAL.remove();
    }
}

有了上下文之后,ActionBasedActionTypeProvider 直接從 Context 中獲取 ActionType 具體如下:

@Component
public class ActionBasedActionTypeProvider implements ActionTypeProvider {
    @Override
    public ActionType get() {
        return ActionContext.get();
    }
}

如何對請求進行標記?如何對 ActionType 進行管理(包括信息綁定和信息清理)?最常用的方式便是:

  1. 提供一個注解,在方法上添加注解用于對 ActionType 的配置;
  2. 提供一個攔截器,對方法調用進行攔截:

方法調用前,從注解中獲取配置信息并綁定到上下文;

執行具體的業務方法;

方法調用后,主動清理上下文信息;

核心實現為:

@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface Action {
    ActionType type();
}

@Aspect
@Component
@Order(Integer.MIN_VALUE)
public class ActionAspect {
    @Pointcut("@annotation(com.geekhalo.lego.faultrecovery.smart.Action)")
    public void pointcut() {
    }

    @Around(value = "pointcut()")
    public Object action(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Action annotation = methodSignature.getMethod().getAnnotation(Action.class);
        ActionContext.set(annotation.type());
        try {
            return joinPoint.proceed();
        }finally {
            ActionContext.clear();
        }
    }
}

在這些組件的幫助下,我們只需在方法上基于 @Action 注解進行標記,便能夠將 ActionType 綁定到上下文。

2.4.2. @SmartFault

在完成 ActionType 綁定到上下文之后,接下來要做的便是對 遠程接口 進行配置。遠程接口的配置工作主要由 @SmartFault 來完成。其核心配置項包括:

配置項

含義

默認配置

recover

fallback 方法名稱

 

maxRetry

最大重試次數

3

include

觸發重試的異常類型

 

exclude

不需要重新的異常類型

 

測試 Demo 如下:

@Service
@Slf4j
@Getter
public class RetryService3 {
    private int count = 0;

    private int retryCount = 0;
    private int fallbackCount = 0;
    private int recoverCount = 0;

    public void clean(){
        this.retryCount = 0;
        this.fallbackCount = 0;
        this.recoverCount = 0;
    }

    /**
    * Command 請求,啟動重試機制
    */
    @Action(type = ActionType.COMMAND)
    @SmartFault(recover = "recover")
    public Long retry(Long input) throws Throwable{
        this.retryCount ++;
        return doSomething(input);
    }

    /**
    * Query 請求,啟動Fallback機制
    */
    @Action(type = ActionType.QUERY)
    @SmartFault(recover = "recover")
    public Long fallback(Long input) throws Throwable{
        this.fallbackCount ++;
        return doSomething(input);
    }

    @Recover
    public Long recover(Throwable e, Long input){
        this.recoverCount ++;
        log.info("recover-{}", input);
        return input;
    }

    private Long doSomething(Long input) {
        // 偶數拋出異常
        if (count ++ % 2 == 0){
            log.info("Error-{}", input);
            throw new RuntimeException();
        }
        log.info("Success-{}", input);
        return input;
    }
}
2.4.3. 測試

最后,對代碼進行簡單測試:

@SpringBootTest(classes = DemoApplication.class)
public class RetryService3Test {
    @Autowired
    private RetryService3 retryService;

    @BeforeEach
    public void setup(){
        retryService.clean();
    }

    @Test
    public void retry() throws Throwable{
        for (int i = 0; i < 100; i++){
            retryService.retry(i + 0L);
        }

        Assertions.assertTrue(retryService.getRetryCount() > 0);
        Assertions.assertTrue(retryService.getRecoverCount() == 0);
        Assertions.assertTrue(retryService.getFallbackCount() == 0);
    }

    @Test
    public void fallback() throws Throwable{
        for (int i = 0; i < 100; i++){
            retryService.fallback(i + 0L);
        }

        Assertions.assertTrue(retryService.getRetryCount() == 0);
        Assertions.assertTrue(retryService.getRecoverCount() > 0);
        Assertions.assertTrue(retryService.getFallbackCount() > 0);
    }
}

運行 retry 測試,日志如下:

[main] c.g.l.c.f.smart.SmartFaultExecutor       : action type is COMMAND
[main] c.g.l.faultrecovery.smart.RetryService3  : Error-0
[main] c.g.l.c.f.smart.SmartFaultExecutor       : Retry method public JAVA.lang.Long com.geekhalo.lego.faultrecovery.smart.RetryService3.retry(java.lang.Long) throws java.lang.Throwable use [0]
[main] c.g.l.faultrecovery.smart.RetryService3  : Success-0

可見,當 action type 為 COMMAND 時:

  • 第一次調用時,觸發異常,打?。篍rror-0
  • 此時 SmartFaultExecutor 主動進行重試,打?。篟etry method xxxx
  • 方法重試成功,RetryService3 打印:Success-0

方法主動進行重試,流程從異常中恢復,處理過程和效果符合預期。

運行 fallback 測試,日志如下:

[main] c.g.l.c.f.smart.SmartFaultExecutor       : action type is QUERY
[main] c.g.l.faultrecovery.smart.RetryService3  : Error-0
[main] c.g.l.c.f.smart.SmartFaultExecutor       : recover From ERROR for method ReflectiveMethodInvocation: public java.lang.Long com.geekhalo.lego.faultrecovery.smart.RetryService3.fallback(java.lang.Long) throws java.lang.Throwable; target is of class [com.geekhalo.lego.faultrecovery.smart.RetryService3]
[main] c.g.l.faultrecovery.smart.RetryService3  : recover-0

可見,當 action type 為 QUERY 時:

  • 第一次調用時,觸發異常,打印:Error-0
  • SmartFaultExecutor 執行 Fallback 策略,打?。簉ecover From ERROR for method xxxx
  • 調用RetryService3的 recover 方法,獲取最終返回值。RetryService3 打?。簉ecover-0

異常后自動執行 fallback,將流程從異常中恢復過來,處理過程和效果符合預期。

3. 服務端冪等

服務端冪等指的是對于相同的輸入請求,服務端能夠產生相同的結果,而且不會對系統狀態造成影響。冪等性是為了防止由于重試等原因導致的重復執行帶來的副作用。在RPC中,服務端需要設計和實現冪等性的方法,確保多次接收到相同的請求時能正確處理,而不會重復執行對系統狀態有改變的操作。

3.1. 冪等與冪等接口

冪等是指對同一個操作的多次執行所產生的影響與一次執行的影響相同,即不會因為多次執行而產生額外的副作用。在分布式系統中,由于網絡等各種因素的存在,可能會導致對同一個操作進行多次執行,此時如果該操作是冪等的,那么就可以避免數據沖突和重復處理問題。

冪等接口是指對外提供的接口,它們所提供的業務操作具有冪等性。也就是說,在客戶端多次調用該接口時,每次調用都會產生相同的結果,并且不會產生額外的副作用。

例如,銀行轉賬操作就應該是一個冪等接口。假設客戶端已經成功地向銀行發起了一次轉賬請求,但由于網絡不穩定等原因,導致該請求的響應沒有及時返回給客戶端。此時客戶端可能會誤以為轉賬請求失敗,進而再次發送同樣的轉賬請求。如果銀行的轉賬接口是冪等的,那么無論客戶端發送多少次轉賬請求,最終的結果都應該是相同的——只有一次轉賬操作會被執行,其他的請求都會被忽略。

3.2. 天然冪接口

有些接口天然就具備冪等性。這些接口通常是對資源的查詢操作,不會對資源進行修改。以下是一些天然具備冪等性的場景:

  • 查詢接口:對于只用于查詢數據的接口,例如獲取用戶信息、獲取訂單詳情等,多次調用不會對數據產生影響;
  • 獲取接口:對于獲取資源的接口,例如獲取圖片、獲取文件等,無論調用多少次,得到的都是相同的資源副本;
  • 計算接口:對于純計算的接口,例如加法、乘法等數學運算接口,多次調用得到的結果是相同的;
  • 驗證接口:對于驗證某個條件的接口,例如檢查用戶名是否已存在、驗證手機號是否有效等,多次調用得到的驗證結果都是一致的;

這些接口在設計上更容易滿足冪等性的要求,因為它們的操作本身并沒有產生副作用,不會對數據進行修改。在分布式系統中,可以安全地多次調用這些接口,而不會引發數據沖突。

3.3. 非冪等接口

非冪等接口是指對資源進行修改、狀態進行變更而產生副作用的接口,多次調用可能會導致不同的結果。以下是一些常見的非冪等接口:

  • 創建資源接口:例如創建用戶、創建訂單等操作,多次調用會生成多個相同的資源實例,每次調用都會產生不同的結果;
  • 修改資源接口:例如更新用戶信息、修改文章內容等操作,多次調用會對同一個資源進行多次修改,每次修改都會產生不同的結果;
  • 刪除資源接口:例如刪除文件、刪除訂單等操作,多次調用會多次刪除同一個資源,每次調用都會產生不同的結果;

這些非冪等接口在設計上需要特別注意,并采取合適的措施來確保數據的一致性和操作的正確性。

3.4. 重復請求的處理方式

當系統檢測出當前請求是重復請求時,通常會有兩種處理策略:

  • 直接拋出冪等異常,用以說明該請求為重復請求;
  • 直接返回上次請求一致的返回結果;

這兩種方案在實現上差異不大,但在客戶端使用中差異巨大。

  • 如果是異常方案,客戶端在調用冪等接口時需要對異常進行捕獲,然后通過其他 API 獲取上次請求的處理結果,根據結果不同來決定接下來的處理流程;
  • 如果是直接返回上次請求的處理結果,則客戶端不需要做額外的處理,直接使用重試機制對請求進行重新發送即可,獲取結果后自然進入到下面的流程;

所以,在冪等設計中,優先使用 “直接返回上次請求結果” 方案。

綜上分析,冪等可以做為一種通用能力與業務處理邏輯進行充分解構,這就是組件封裝的前提。

3.5. 冪等組件

我們可以將冪等封裝成一種能力,然后在需要冪等保護的業務方法上進行配置,從而實現冪等能力與業務方法的徹底解耦。

整體設計如下圖所示:

DDD與微服務集成的第一戰役:客戶端重試&服務端冪等圖片

整體設計比較簡單,運行流程如下:

  • IdempotentInterceptor 會對 @Idempotent 注解標記的方法進行攔截;
  • 當方法第一次被調用時,會讀取 @Idempotent 注解上的配置信息,使用 IdempotentExecutorFactoris 為每個方法創建一個 IdempotentExecutor 實例;
  • 在方法調用時,將請求直接路由到 IdempotentExecutor 實例,由 IdempotentExecutor 完成核心流程;
  • 其中,IdempotentExecutorFactories 擁有多個 IdempotentExecutorFactory 實例,并根據 @Idempotent 上配置的 executorFactory 屬性使用對應的實例完成創建工作;

從設計上看,系統中可以同時配置多個 IdempotentExecutorFactory,然后根據不同的業務場景設置不同的 executorFactory。

冪等處理的核心流程如下:

DDD與微服務集成的第一戰役:客戶端重試&服務端冪等圖片

IdempotentExecutor 處理核心流程如下:

  • 通過 SpringEL 表達式從入參中提取 unique key 信息;
  • 根據 group 和 unique key 從 ExecutionRecordRepository 中讀取執行記錄 ExecutionRecord;
  • 如果 ExecutionRecord 為已完成狀態,則根據配置直接返回 ExecutionRecord 的執行結果 或者 直接拋出 RepeatedSubmitException 異常;
  • 如果 ExecutionRecord 為執行中,則出現并發問題,直接拋出 ConcurrentRequestException 異常;
  • 如果 ExecutionRecord 為未執行,先執行方法獲取返回值,然后使用 ExecutionRecordRepository 對 ExecutionRecord 進行更新,然后返回執行結果;

使用前需要引入 lego starter,在 maven pom 中添加如下信息:

<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>最新版本</version>
3.5.1. 配置 dbExecutorFactory

以 JpaRepository 為例實現對 IdempotentExecutorFactory 的配置,具體如下:

@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {

    @Bean("dbExecutorFactory")
    public IdempotentExecutorFactory dbExecutorFactory(JpaBasedExecutionRecordRepository recordRepository){
        return createExecutorFactory(recordRepository);
    }
}

其中,IdempotentConfigurationSupport 已經提供 idempotent 所需的很多 Bean,同時提供 createExecutorFactory(repository) 方法,用以完成 IdempotentExecutorFactory 的創建。

使用 Jpa 需要調整 EnableJpaRepositories 相關配置,具體如下:

@Configuration
@EnableJpaRepositories(basePackages = {
        "com.geekhalo.lego.core.idempotent.support.repository"
}, repositoryFactoryBeanClass = JpaBasedQueryObjectRepositoryFactoryBean.class)
public class SpringDataJpaConfiguration {
}

其中,com.geekhalo.lego.core.idempotent.support.repository 為固定包名,指向 Jpa 默認實現 JpaBasedExecutionRecordRepository,Spring Data Jpa 會自動生成實現的代理對象。

最后,在數據庫中增加 冪等所需表,sql 如下:

CREATE TABLE `idempotent_execution_record` (
   `id` bigint(20) NOT NULL AUTO_INCREMENT,
   `type` int(11) NOT NULL,
   `unique_key` varchar(64) NOT NULL,
   `status` int(11) NOT NULL,
   `result` varchar(1024) DEFAULT NULL,
   `create_date` datetime DEFAULT NULL,
   `update_date` datetime DEFAULT NULL,
   PRIMARY KEY (`id`),
   UNIQUE KEY `unq_type_key` (`type`,`unique_key`)
) ENGINE=InnoDB;

至此,便完成了基本配置。

【注】關于 Spring data jpa 配置,可以自行到網上進行檢索。

3.5.2. 冪等保護示例

在方法上增加 @Idempotent 注解便可以使其具備冪等保護,示例如下:

@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
        handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putForResult(String key, Long data){
    return put(key, data);
}

其中 @Idempotent 為核心配置,詳細信息如下:

  • executorFactory 為 IdempotentExecutorFactory,及在 IdempotentConfiguration 中配置的bean,默認為 DEFAULT_EXECUTOR_FACTORY
  • group 為組信息,用于區分不同的業務場景,同一業務場景使用相同的配置;
  • keyEl 為提取冪等鍵所用的 SpringEl 表達式,#key 說明入參的 key 將作為冪等鍵,group + key 為一個完整的冪等鍵,唯一識別一次請求;
  • handleType 是處理類型,及重復提交時如何處理請求

RESULT,直接返回上次的執行結果

ERROR,直接拋出 RepeatedSubmitException 異常

編寫簡單的測試用例如下:

@Test
void putForResult() {
    BaseIdempotentService idempotentService = getIdempotentService();
    String key = String.valueOf(RandomUtils.nextLong());
    Long value = RandomUtils.nextLong();

    {   // 第一次操作,返回值和最終結果符合預期
        Long result = idempotentService.putForResult(key, value);
        Assertions.assertEquals(value, result);
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }

    {   // 第二次操作,返回值和最終結果 與第一次一致(直接獲取返回值,沒有執行業務邏輯)
        Long valueNew = RandomUtils.nextLong();
        Long result = idempotentService.putForResult(key, valueNew);

        Assertions.assertEquals(value, result);
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }
}

運行測試用例,測試通過,可得出如下結論:

  • 第一次操作,與正常方法一致,成功返回結果值;
  • 第二次操作,邏輯方法未執行,直接返回第一次的運行結果;

這是最常見的一種工作模式,除直接返回上次執行結果外,當發生重復提交時也可以拋出異常中斷流程,只需將 handleType 設置為 ERROR 即可,具體如下:

@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
    handleType = IdempotentHandleType.ERROR)
@Transactional
public Long putForError(String key, Long data){
    return put(key, data);
}

編寫測試用例,具體如下:

@Test
void putForError() {
    BaseIdempotentService idempotentService = getIdempotentService();
    String key = String.valueOf(RandomUtils.nextLong());
    Long value = RandomUtils.nextLong();

    { // 第一次操作,返回值和最終結果符合預期
        Long result = idempotentService.putForError(key, value);
        Assertions.assertEquals(value, result);
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }

    { // 第二次操作,直接拋出異常,結果與第一次一致
        Assertions.assertThrows(RepeatedSubmitException.class, () ->{
            Long valueNew = RandomUtils.nextLong();
            idempotentService.putForError(key, valueNew);
        });
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }
}

運行測試用例,測試通過,可以得出:

  • 第一次操作,與正常方法一致,成功返回結果值;
  • 第二次操作,直接拋出 RepeatedSubmitException 異常,同時方法未執行,結果與第一次調用一致;
3.5.3. 冪等與異常

異常是一種特殊的返回值?。?!

如果將異??醋鍪且环N特殊的返回值,那冪等接口在第二次請求時同樣需要拋出異常,示例代碼如下:

@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
        handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putExceptionForResult(String key, Long data) {
    return putException(key, data);
}
protected Long putException(String key, Long data){
    this.data.put(key, data);
    throw new IdempotentTestException();
}

@Idempotent 注解沒有變化,只是在 putException 方法執行后拋出 IdempotentTestException 異常。

編寫簡單測試用例如下:

@Test
void putExceptionForResult(){
    BaseIdempotentService idempotentService = getIdempotentService();
    String key = String.valueOf(RandomUtils.nextLong());
    Long value = RandomUtils.nextLong();

    {   // 第一次操作,拋出異常
        Assertions.assertThrows(IdempotentTestException.class,
                ()->idempotentService.putExceptionForResult(key, value));
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }

    {   // 第二次操作,返回值和最終結果 與第一一致(直接獲取返回值,沒有執行業務邏輯)
        Long valueNew = RandomUtils.nextLong();
        Assertions.assertThrows(IdempotentTestException.class,
                ()->idempotentService.putExceptionForResult(key, valueNew));
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }
}

運行測試用例,用例通過,可知:

  • 第一次操作,與方法邏輯一致,更新數據并拋出 IdempotentTestException 異常;
  • 第二次操作,直接拋出 IdempotentTestException 異常,同時方法未執行,結果與第一次一致;
3.5.4. 并發保護

如果上一個請求執行尚未結束,新的請求已經開啟,那會如何?

這就是最常見的并發場景,idempotent 對其也進行了支持,當出現并發請求時會直接拋出 ConcurrentRequestException,用于中斷處理。

首先,使用 sleep 模擬一個耗時的方法,具體如下:

@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
            handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putWaitForResult(String key, Long data) {
    return putForWait(key, data);
}
protected Long putForWait(String key, Long data){
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return put(key, data);
}

putWaitForResult 方法調用時會主動 sleep 3 秒,然后才執行真正的邏輯。

編寫測試代碼如下:

@Test
void putWaitForResult(){
    String key = String.valueOf(RandomUtils.nextLong());
    Long value = RandomUtils.nextLong();

    // 主線程拋出 ConcurrentRequestException 
    Assertions.assertThrows(ConcurrentRequestException.class, () ->
        testForConcurrent(baseIdempotentService ->
            baseIdempotentService.putWaitForResult(key, value))
    );
}
private void testForConcurrent(Consumer<BaseIdempotentService> consumer) throws InterruptedException {
    // 啟動一個線程執行任務,模擬并發場景
    Thread thread = new Thread(() -> consumer.accept(getIdempotentService()));
    thread.start();

    // 主線程 sleep 1 秒,與異步線程并行執行任務
    TimeUnit.SECONDS.sleep(1);
    consumer.accept(getIdempotentService());
}

運行單元測試,測試通過,核心測試邏輯如下:

  • 創建一個線程,執行耗時方法;
  • 等待 1 秒后,主線程也執行耗時方法;
  • 此時,兩個線程并發執行耗時方法,后進入的主線程直接拋出 ConcurrentRequestException;
3.5.5. redis 支持

DB 具有非常好的一致性,但性能存在一定的問題。在一致性要求不高,性能要求高的場景,可以使用 Redis 作為 ExecutionRecord 的存儲引擎。

引入 redis 非常簡單,大致分兩步:

  • 在 IdempotentConfiguration 中注冊 redisExecutorFactory bean;
  • @Idempotent 注解中使用 redisExecutorFactory 即可;

添加 redisExecutorFactory Bean,具體如下:

@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {

    @Bean("redisExecutorFactory")
    public IdempotentExecutorFactory redisExecutorFactory(ExecutionRecordRepository executionRecordRepository){
        return createExecutorFactory(executionRecordRepository);
    }

    @Bean
    public ExecutionRecordRepository executionRecordRepository(RedisTemplate<String, ExecutionRecord> recordRedisTemplate){
        return new RedisBasedExecutionRecordRepository("ide-%s-%s", Duration.ofDays(7), recordRedisTemplate);
    }

    @Bean
    public RedisTemplate<String, ExecutionRecord> recordRedisTemplate(RedisConnectionFactory redisConnectionFactory){
        RedisTemplate<String, ExecutionRecord> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
        Jackson2JsonRedisSerializer<ExecutionRecord> executionRecordJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(ExecutionRecord.class);
        executionRecordJackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        redisTemplate.setValueSerializer(executionRecordJackson2JsonRedisSerializer);
        return redisTemplate;
    }
}

@Idempotent 注解調整如下:

@Idempotent(executorFactory = "redisExecutorFactory", group = 1, keyEl = "#key",
        handleType = IdempotentHandleType.RESULT)
@Override
public Long putForResult(String key, Long data){
    return put(key, data);
}

這樣,所有的冪等信息都會存儲在 redis 中。

【注】一般 redis 不會對數據進行持久存儲,只能保障在一段時間內的冪等性,超出時間后,由于 key 被自動清理,冪等將不再生效。對于業務場景不太嚴格但性能要求較高的場景才可使用,比如為過濾系統中由于 retry 機制造成的重復請求。

4. 小結

當系統開啟微服務化后,服務調用的第三狀態就成為了不可回避的話題。通常情況下,會綜合使用 客戶端重試 和 服務端冪等 兩個方案來解決:

  • 客戶端重試。需要根據不同的場景選擇不同的 Retry 和 Fallback 機制:

寫請求,建設使用 Retry 機制,保障最重要的流量不被浪費

讀請求,建議使用 Fallback 機制,避免重試流量對下游服務造成巨大壓力

  • 服務端冪等。冪等作為一種通用能力,建議與業務邏輯進行分離,在需要的時候直接在業務方法上進行配置即可,但仍舊有一些最佳實踐:
  • 使用直接返回上次的處理結果替代異常中斷,使調用方更加簡潔;
  • 業務異常是一種特殊的返回值,也需要進行冪等保護;
  • 有冪等保護后,仍舊需要對并發請求進行處理,此時直接通過異常對重復流程進行中斷即可;

以上兩種場景,lego 對其都進行了封裝,可以方便的應用于業務系統,從而降低微服務的傷害。

分享到:
標簽:DDD
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定