一飛開源,介紹創意、新奇、有趣、實用的免費開源應用、系統、軟件、硬件及技術,一個探索、發現、分享、使用與互動交流的開源技術社區平臺。致力于打造活力開源社區,共建開源新生態!一、開源項目簡介本框架是什么
Gobrs-Async是一款功能強大、配置靈活、帶有全鏈路異常回調、內存優化、異常狀態管理于一身的高性能異步編排框架。為企業提供在復雜應用場景下動態任務編排的能力。 針對于復雜場景下,異步線程復雜性、任務依賴性、異常狀態難控制性;Gobrs-Async為此而生。
二、開源協議
使用Apache-2.0開源協議
三、界面展示為什么寫這個項目
在開發復雜中臺業務過程中,難免會遇到調用各種中臺業務數據, 而且會出現復雜的中臺數據依賴關系,在這種情況下。代碼的復雜程度就會增加。 如下圖所示:
在電商平臺業務中, 各中臺數據可能依賴 商品Product 數據,而且需要依賴特殊屬性中 Item的數據。(有朋友會問,為什么Product 數據不和 Item數據出自同一個中臺呢?中臺業務發展是多樣性的,不同業務中臺設計方式不同 , 難道我們就不對接了嗎?所以我們要針對于這種復雜多變的中臺業務數據提供技術支撐才是一個合格的開發者應該做的)而且Item數據是HTTP的服務,但Product 是rpc服務。 如果按照Future的 開發方式。我們可能會這樣開發
// 并行處理任務 Product 、 Item 的任務 @Resource List paraExectors; // 依賴于Product 和 Item的 任務 @Resource List serExectors; public void testFuture(HttpServletRequest httpServletRequest) { DataContext dataContext = new DataContext(); dataContext.setHttpServletRequest(httpServletRequest); List list = new ArrayList<>(); for (AsyncTask asyncTask : paraExectors) { Future submit = GobrsThreadPoolExecutor.submit(() -> { asyncTask.com.gobrs.async.com.gobrs.async.test.task(dataContext, null); }); list.add(submit); } for (Future future : list) { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } List ser = new ArrayList<>(); for (AsyncTask asyncTask : serExectors) { Future submit = gobrsThreadPoolExecutor.submit(() -> { asyncTask.com.gobrs.async.com.gobrs.async.test.task(dataContext, null); }); ser.add(submit); } for (Future future : ser) { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
存在的問題
以上示例中,Product數據是通過RPC 方式獲取, Item是通過HTTP服務獲取,大家都知道, RPC性能要高于HTTP性能。 但是通過Future 的方式, get會阻塞等待 Item數據返回后才會往下執行。 這樣的話, 圖書音像、裝修數據、限購數據等都要等待Item數據返回,但是這些中臺并不依賴Item返回的數據, 所以會產生等待時間影響系統整體QPS。
起源
- 作者通過對開源中間件的源碼詳細閱讀和二次開發的經驗和使用心得總結而來。
- 用戶的一些使用體驗 包括業務的需求
四、功能概述能解決什么問題
能解決 CompletableFuture 所不能解決的問題。 怎么理解呢?
傳統的Future、CompleteableFuture一定程度上可以完成任務編排,并可以把結果傳遞到下一個任務。如CompletableFuture有then方法,但是卻無法做到對每一個執行單元的回調。譬如A執行完畢成功了,后面是B,我希望A在執行完后就有個回調結果,方便我監控當前的執行狀況,或者打個日志什么的。失敗了,我也可以記錄個異常信息什么的。
此時,CompleteableFuture就無能為力了。
Gobrs-Async框架提供了這樣的回調功能。并且,如果執行成功、失敗、異常、超時等場景下都提供了管理線程任務的能力!
場景概述場景一
說明任務A 執行完了之后,繼續執行 B、C、D
場景二
說明任務A 執行完了之后執行B 然后再執行 C、D
場景三
說明任務A 執行完了之后執行B、E 然后按照順序 B的流程走C、D、G。 E的流程走F、G
還有更多場景,如果你想詳細理解任務編排的概念, 請仔細閱讀文檔,或者通過資源索引導航到官網了解全貌!業界對比
在開源平臺找了挺多任務異步編排框架,發現都不是很理想,唯一一款現階段比較好用的異步編排框架就數asyncTool比較好用。但是在使用時發現,API不是很好用。而且需要頻繁的創建 WorkerWrApper 對象 用起來有點不爽。對于業務比較復雜的場景,在開發時需要寫較多的 WorkerWrapper 代碼,而且框架不能對全局異常進行攔截。只能通過任務的 result 方法捕捉單任務的異常。 不能在任意任務出現異常后 停止全局的異步任務。同時無法在發生全局異常的時候進行異常攔截。如果需要實現在發生需停止全局任務流的時候,發送報警郵件的功能。 asyncTool就顯得力不從心了。
asyncTool 本身已經功能很強大了,本人與asyncTool 作者 都是在京東擔任研發工作。 涉及到的場景大同小異。 會有很多業務場景,很復雜的中臺接口調用關系。所以針對當前業務場景。需要探索更多的技術領域。本身技術是應該服務于業務,落地業務場景。
想給項目起一個簡單易記的名字,類似于 Eureka、Nacos、redis;經過再三考慮后,決定命名:Gobrs-Async
功能
asyncTool
Gobrs-Async
sirector
多任務處理
單任務異常回調
全局異常中斷
可配置任務流
自定義異常攔截器
內存優化
可選的任務執行
它解決了什么問題
在請求調用各大中臺數據時,難免會出現多個中臺數據互相依賴的情況,現實開發中會遇到如下場景。
并行常見的場景 1 客戶端請求服務端接口,該接口需要調用其他N個微服務的接口
譬如 請求我的購物車,那么就需要去調用用戶的rpc、商品詳情的rpc、庫存rpc、優惠券等等好多個服務。同時,這些服務還有相互依賴關系,譬如必須先拿到商品id后,才能去庫存rpc服務請求庫存信息。 最終全部獲取完畢后,或超時了,就匯總結果,返回給客戶端。
2 并行執行N個任務,后續根據這1-N個任務的執行結果來決定是否繼續執行下一個任務
如用戶可以通過郵箱、手機號、用戶名登錄,登錄接口只有一個,那么當用戶發起登錄請求后,我們需要并行根據郵箱、手機號、用戶名來同時查數據庫,只要有一個成功了,都算成功,就可以繼續執行下一步。而不是先試郵箱能否成功、再試手機號……
再如某接口限制了每個批次的傳參數量,每次最多查詢10個商品的信息,我有45個商品需要查詢,就可以分5堆并行去查詢,后續就是統計這5堆的查詢結果。就看你是否強制要求全部查成功,還是不管有幾堆查成功都給客戶做返回
再如某個接口,有5個前置任務需要處理。其中有3個是必須要執行完畢才能執行后續的,另外2個是非強制的,只要這3個執行完就可以進行下一步,到時另外2個如果成功了就有值,如果還沒執行完,就是默認值。
3 需要進行線程隔離的多批次任務。
如多組任務, 各組任務之間彼此不相關,每組都需要一個獨立的線程池,每組都是獨立的一套執行單元的組合。有點類似于hystrix的線程池隔離策略。
4 單機工作流任務編排。
5 其他有順序編排的需求。
它有什么特性
Gobrs-Async 在開發時考慮了眾多使用者的開發喜歡,對異常處理的使用場景。并被運用到電商生產環境中,在京東經歷這嚴酷的高并發考驗。同時框架中 極簡靈活的配置、全局自定義可中斷全流程異常、內存優化、靈活的接入方式、提供SpringBoot Start 接入方式。更加考慮使用者的開發習慣。僅需要注入GobrsTask的Spring Bean 即可實現全流程接入。
Gobrs-Async 項目目錄及其精簡
- gobrs-async-example:Gobrs-Async 接入實例,提供測試用例。
- gobrs-async-starter:Gobrs-Async 框架核心組件
Gobrs-Async 在設計時,就充分考慮了開發者的使用習慣, 沒有依賴任何中間件。 對并發框架做了良好的封裝。主要使用 CountDownLatch 、ReentrantLock 、volatile 等一系列并發技術開發設計。
五、技術選型整體架構
任務觸發器
任務流的啟動者, 負責啟動任務執行流
規則解析引擎
負責解析使用者配置的規則,同時于Spring結合,將配置的 Spring Bean 解析成 TaskBean,進而通過解析引擎加載成 任務裝飾器。進而組裝成任務樹
任務啟動器
負責通過使用解析引擎解析的任務樹。結合JUC并發框架調度實現對任務的統一管理,核心方法有
- trigger 觸發任務加載器,為加載任務準備環境
負責加載任務流程,開始調用任務執行器執行核心流程
- load 核心任務流程方法,在這里阻塞等待整個任務流程
- getBeginProcess 獲取子任務開始流程
- completed 任務完成
- errorInterrupted 任務失敗 中斷任務流程
- error 任務失敗
最終的任務執行,每一個任務對應一個TaskActuator 任務的 攔截、異常、執行、線程復用 等必要條件判斷都在這里處理
- prepare 任務前置處理
- preInterceptor 統一任務前置處理
- task 核心任務方法,業務執行內容
- postInterceptor 統一后置處理
- onSuccess 任務執行成功回調
- onFail 任務執行失敗回調
任務流程傳遞總線,包括 請求參數、任務加載器、 響應結果, 該對象暴露給使用者,拿到匹配業務的數據信息,例如: 返回結果、主動中斷任務流程等功能 需要任務總線(TaskSupport)支持
核心類圖
六、源碼地址
訪問一飛開源:https://code.exmay.com/