關于PowerJob
PowerJob(原OhMyScheduler)是全新一代分布式任務調度與計算框架,其主要功能特性如下:
- 使用簡單:提供前端Web界面,允許開發者可視化地完成調度任務的管理(增、刪、改、查)、任務運行狀態監控和運行日志查看等功能。
- 定時策略完善:支持 CRON 表達式、固定頻率、固定延遲和API四種定時調度策略。
- 執行模式豐富:支持單機、廣播、Map、MapReduce 四種執行模式,其中 Map/MapReduce 處理器能使開發者寥寥數行代碼便獲得集群分布式計算的能力。
- 工作流支持:支持在線配置任務依賴關系(DAG),以可視化的方式對任務進行編排,同時還支持上下游任務間的數據傳遞,以及多種節點類型(判斷節點 & 嵌套工作流節點)。
- 執行器支持廣泛:支持 Spring Bean、內置/外置 JAVA 類,另外可以通過引入官方提供的依賴包,一鍵集成 Shell、Python/ target=_blank class=infotextkey>Python、HTTP、SQL 等處理器,應用范圍廣。
- 運維便捷:支持在線日志功能,執行器產生的日志可以在前端控制臺頁面實時顯示,降低 debug 成本,極大地提高開發效率。
- 依賴精簡:最小僅依賴關系型數據庫(MySQL/PostgreSQL/Oracle/MS SQLServer...)
- 高可用 & 高性能:調度服務器經過精心設計,一改其他調度框架基于數據庫鎖的策略,實現了無鎖化調度。部署多個調度服務器可以同時實現高可用和性能的提升(支持無限的水平擴展)。
- 故障轉移與恢復:任務執行失敗后,可根據配置的重試策略完成重試,只要執行器集群有足夠的計算節點,任務就能順利完成。
適用場景
- 有定時執行需求的業務場景:如每天凌晨全量同步數據、生成業務報表、未支付訂單超時取消等。
- 有需要全部機器一同執行的業務場景:如使用廣播執行模式清理集群日志。
- 有需要分布式處理的業務場景:比如需要更新一大批數據,單機執行耗時非常長,可以使用Map/MapReduce 處理器完成任務的分發,調動整個集群加速計算。
- 有需要延遲執行某些任務的業務場景:比如訂單過期處理等。
同類產品對比
基本概念
分組概念
- AppName:應用名稱,建議與用戶實際接入 PowerJob 的應用名稱保持一致,用于業務分組與隔離。一個 appName 等于一個業務集群,也就是實際的一個 Java 項目。
核心概念
- 任務(Job):描述了需要被 PowerJob 調度的任務信息,包括任務名稱、調度時間、處理器信息等。
- 任務實例( JobInstance,簡稱 Instance):任務(Job)被調度執行后會生成任務實例(Instance),任務實例記錄了任務的運行時信息(任務與任務實例的關系類似于類與對象的關系)。
- 作業(Task):任務實例的執行單元,一個 JobInstance 存在至少一個 Task,具體規則如下:
- 單機任務(STANDALONE):一個 JobInstance 對應一個 Task
- 廣播任務(BROADCAST):一個 JobInstance 對應 N 個 Task,N為集群機器數量,即每一臺機器都會生成一個 Task
- Map/MapReduce任務:一個 JobInstance 對應若干個 Task,由開發者手動 map 產生
- 工作流(Workflow):由 DAG(有向無環圖)描述的一組任務(Job),用于任務編排。
- 工作流實例(WorkflowInstance):工作流被調度執行后會生成工作流實例,記錄了工作流的運行時信息。
擴展概念
- JVM 容器:以 Maven 工程項目的維度組織一堆 Java 文件(開發者開發的眾多 Java 處理器),可以通過前端網頁動態發布并被執行器加載,具有極強的擴展能力和靈活性。
- OpenAPI:允許開發者通過接口來完成手工的操作,讓系統整體變得更加靈活。開發者可以基于 API 便捷地擴展 PowerJob 原有的功能。
定時任務類型
- API:該任務只會由 powerjob-client 中提供的 OpenAPI 接口觸發,server 不會主動調度。
- CRON:該任務的調度時間由 CRON 表達式指定。
- 固定頻率:秒級任務,每隔多少毫秒運行一次,功能與 java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate 相同。
- 固定延遲:秒級任務,延遲多少毫秒運行一次,功能與 java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay 相同。
- 工作流:該任務只會由其所屬的工作流調度執行,server 不會主動調度該任務。如果該任務不屬于任何一個工作流,該任務就不會被調度。
備注:固定延遲和固定頻率任務統稱秒級任務,這兩種任務無法被停止,只有任務被關閉或刪除時才能真正停止任務。
搭建PowerJob環境
本地啟動
初始化項目
git clone https://Github.com/PowerJob/PowerJob.git
導入 IDE,源碼結構如下,我們需要啟動調度服務器(powerjob-server),同時在 samples 工程中編寫自己的處理器代碼
啟動調度服務器
- 創建數據庫(僅需要創建數據庫):找到你的 DB,運行 SQL CREATE DATABASE IF NOT EXISTS `powerjob-dAIly` DEFAULT CHARSET utf8mb4,搞定~
- 修改配置文件:配置文件的說明官方文檔寫的非常詳細,此處不再贅述。需要修改的地方為數據庫配置spring.datasource.core.jdbc-url、spring.datasource.core.username和spring.datasource.core.password,當然,有 mongoDB 的同學也可以修改spring.data.mongodb.uri以獲取完全版體驗。
powerjob-server 日常環境配置文件:application-daily.properties
oms.env=DAILY
logging.cnotallow=classpath:logback-dev.xml
####### 外部數據庫配置(需要用戶更改為自己的數據庫配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimeznotallow=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置,非核心依賴,通過配置 oms.mongodb.enable=false 來關閉 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
####### 郵件配置(不需要郵件報警可以刪除以下配置來避免報錯) #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 資源清理配置 #######
oms.instanceinfo.retentinotallow=1
oms.container.retention.local=1
oms.container.retention.remote=-1
####### 緩存配置 #######
oms.instance.metadata.cache.size=1024
- 啟動應用:完成配置文件的修改后,可以直接通過啟動類 tech.powerjob.server.PowerJobServerApplication 啟動調度服務器(注意:需要使用 daily 配置文件啟動,可自行百度搜索“SpringBoot 指定配置文件啟動”),觀察啟動日志,查看是否啟動成功~啟動成功后,訪問 http://127.0.0.1:7700/ ,如果能順利出現 Web 界面,則說明調度服務器啟動成功!
- 注冊應用:點擊主頁應用注冊按鈕,填入 powerjob-agent-test 和控制臺密碼(用于進入控制臺),注冊示例應用(當然你也可以注冊其他的 appName,只是別忘記在示例程序中同步修改~)
圖片
Docker-compose啟動
環境要求
本地需要安裝docker和docker-compose
下載代碼
git clone --depth=1 https://github.com/PowerJob/PowerJob.git
運行
cd PowerJob
docker-compose up
docker-compose up -d
剛開始啟動時,powerjob-worker-samples會啟動失敗,等powerjob-server啟動成功后,powerjob-worker-samples才會啟動成功。這大概需要幾分鐘。
運行成功后,瀏覽器訪問 http://127.0.0.1:7700/
應用名稱:powerjob-worker-samples
密碼:powerjob123
停止
docker-compose down
Stopping powerjob-worker-samples ... done
Stopping powerjob-server ... done
Stopping powerjob-mysql ... done
Removing powerjob-worker-samples ... done
Removing powerjob-server ... done
Removing powerjob-mysql ... done
cd PowerJob
rm -rf powerjob-data
SpringBoot集成PowerJob
添加相關maven依賴
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>${latest.powerjob.version}</version>
</dependency>
配置文件配置
powerjob:
worker:
# akka 工作端口,可選,默認 27777
akka-port: 27777
# 接入應用名稱,用于分組隔離,推薦填寫 本 Java 項目名稱
app-name: ${spring.application.name}
# 調度服務器地址,IP:Port 或 域名,多值逗號分隔
server-address: 81.70.117.188:7700
# 持久化方式,可選,默認 disk
store-strategy: disk
# 任務返回結果信息的最大長度,超過這個長度的信息會被截斷,默認值 8192
max-result-length: 8192
# 單個任務追加的工作流上下文最大長度,超過這個長度的會被直接丟棄,默認值 8192
max-appended-wf-context-length: 8192
處理器(Processor)開發
處理器概述
基本概念
PowerJob 支持 Python、Shell、HTTP、SQL 等眾多通用任務的處理,開發者只需要引入依賴,在控制臺配置好相關參數即可,關于這部分詳見 官方處理器 ,此處不再贅述。本章將重點闡述 Java 處理器開發方法與使用技巧。
- Java 處理器可根據代碼所處位置劃分為內置 Java 處理器和外置 Java 處理器,前者直接集成在宿主應用(也就是接入本系統的業務應用)中,一般用來處理業務需求;后者可以在一個獨立的輕量級的 Java 工程中開發,通過 JVM 容器技術(詳見容器章節)被 worker 集群熱加載,提供 Java 的“腳本能力”,一般用于處理靈活多變的需求。
- Java 處理器可根據對象創建者劃分為 SpringBean 處理器和普通 Java 對象處理器,前者由 Spring IOC 容器完成處理器的創建和初始化,后者則由 PowerJob 維護其生命周期。如果宿主應用支持 Spring,強烈建議使用 SpringBean 處理器,開發者僅需要將 Processor 注冊進 Spring IOC 容器(一個 @Component 注解或一句 bean 配置)即可享受 Spring 帶來的便捷之處。
- Java處理器可根據功能劃分為單機處理器、廣播處理器、Map 處理器和 MapReduce 處理器。
單機處理器(BasicProcessor)對應了單機任務,即某個任務的某次運行只會有某一臺機器的某一個線程參與運算。
廣播處理器(BroadcastProcessor)對應了廣播任務,即某個任務的某次運行會調動集群內所有機器參與運算。
Map處理器(MapProcessor)對應了Map任務,即某個任務在運行過程中,允許產生子任務并分發到其他機器進行運算。
MapReduce 處理器(MapReduceProcessor)對應了 MapReduce 任務,在 Map 任務的基礎上,增加了所有任務結束后的匯總統計。
入參 TaskContext
TaskContext 包含了本次任務的上下文信息,具體信息如下
屬性列表(紅色標注的為常用屬性) |
|
屬性名稱 |
意義/用法 |
jobId |
任務 ID,開發者一般無需關心此參數 |
instanceId |
任務實例 ID,全局唯一,開發者一般無需關心此參數 |
subInstanceId |
子任務實例 ID,秒級任務使用,開發者一般無需關心此參數 |
taskId |
采用鏈式命名法的 ID,在某個任務實例內唯一,開發者一般無需關心此參數 |
taskName |
task 名稱,Map/MapReduce 任務的子任務的值為開發者指定,否則為系統默認值,開發者一般無需關心此參數 |
jobParams |
任務參數 對于非工作流中的任務其值等同于控制臺錄入的任務參數;如果該任務為工作流中的任務且有配置節點參數信息,那么接收到的是節點配置的參數信息 |
instanceParams |
任務實例參數 對于非工作流中的任務 其值 等同于 OpenAPI 傳遞的實例參數,非 OpenAPI 觸發的任務則一定為空。如果該任務為工作流中的任務那么這里實際接收到的是工作流上下文信息,建議使用 getWorkflowContext 方法獲取上下文信息
|
maxRetryTimes |
Task 的最大重試次數 |
currentRetryTimes |
Task 的當前重試次數,和 maxRetryTimes 聯合起來可以判斷當前是否為該 Task 的最后一次運行機會 |
subTask |
子 Task,Map/MapReduce 處理器專屬,開發者調用map方法時傳遞的子任務列表中的某一個 |
omsLogger |
在線日志,用法同 Slf4J,記錄的日志可以直接通過控制臺查看,非常便捷和強大!不過使用過程中需要注意頻率,濫用在線日志會對 Server 造成巨大的壓力 |
userContext |
用戶在 PowerJobWorkerConfig 中設置的自定義上下文 |
workflowContext |
工作流上下文,更多信息見下方說明 |
工作流上下文( WorkflowContext )
該屬性是 v4.0.0 版本的重大變更之一,移除了原來的參數傳遞機制,提供了 API 讓開發者可以更加靈活便捷地在工作流中實現信息的傳遞。
屬性列表 |
|
屬性名稱 |
意義/用法 |
wfInstanceId |
工作流實例 ID |
data |
工作流上下文數據,鍵值對 |
appendedContextData |
當前任務向工作流上下文中追加的數據。在任務執行完成后 ProcessorTracker 會將其上報給 TaskTracker,TaskTracker 在當前任務執行完成后會將這個信息上報給 server ,追加到當前的工作流上下文中,供下游任務消費 |
上游任務通過 WorkflowContext#appendData2WfContext(String key,Object value) 方法向工作流上下文中追加數據,下游任務便可以通過 WorkflowContext#fetchWorkflowContext() 方法獲取到相應的數據進行消費。注意,當追加的上下文信息的 key 已經存在于當前的上下文中時,新的 value 會覆蓋之前的值。另外,每次任務實例追加的上下文數據大小也會受到 worker 的配置項 powerjob.worker.max-appended-wf-context-length 的限制,超過這個長度的會被直接丟棄。
返回值 ProcessResult
方法的返回值為 ProcessResult,代表了本次 Task 執行的結果,包含 success 和 msg 兩個屬性,分別用于傳遞 Task 是否執行成功和 Task 需要返回的信息。
處理器開發示例
單機處理器:BasicProcessor
單機執行的策略下,server 會在所有可用 worker 中選取健康度最佳的機器進行執行。單機執行任務需要實現接口 BasicProcessor,代碼示例如下:
/**
* @Author iron.guo
* @Date 2023/1/7
* @Description
*/
@Component
@Slf4j
public class StandaloneProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("處理器啟動成功,context 是 {}.", context);
log.info("單機處理器正在處理");
log.info(context.getJobParams());
omsLogger.info("處理器執行結束");
boolean success = true;
return new ProcessResult(success, context + ": " + success);
}
}
執行結果
廣播處理器:BroadcastProcessor
廣播執行的策略下,所有機器都會被調度執行該任務。為了便于資源的準備和釋放,廣播處理器在BasicProcessor 的基礎上額外增加了 preProcess 和 postProcess 方法,分別在整個集群開始之前/結束之后選一臺機器執行相關方法。代碼示例如下:
@Slf4j
@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("預執行,會在所有 worker 執行 process 方法前調用");
log.info("預執行,會在所有 worker 執行 process 方法前調用");
// 預執行,會在所有 worker 執行 process 方法前調用
return new ProcessResult(true, "init success");
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
// 撰寫整個worker集群都會執行的代碼邏輯
omsLogger.info("撰寫整個worker集群都會執行的代碼邏輯");
log.info("撰寫整個worker集群都會執行的代碼邏輯");
return new ProcessResult(true, "release resource success");
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
// taskResults 存儲了所有worker執行的結果(包括preProcess)
// 收尾,會在所有 worker 執行完畢 process 方法后調用,該結果將作為最終的執行結果
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("收尾,會在所有 worker 執行完畢 process 方法后調用,該結果將作為最終的執行結果");
log.info("收尾,會在所有 worker 執行完畢 process 方法后調用,該結果將作為最終的執行結果");
return new ProcessResult(true, "process success");
}
}
執行結果
并行處理器:MapReduceProcessor
MapReduce 是最復雜也是最強大的一種執行器,它允許開發者完成任務的拆分,將子任務派發到集群中其他Worker 執行,是執行大批量處理任務的不二之選!實現 MapReduce 處理器需要繼承 MapReduceProcessor類,具體用法如下示例代碼所示:
@Slf4j
@Component
public class MapReduceProcessorDemo implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
final OmsLogger omsLogger = context.getOmsLogger();
// 判斷是否為根任務
if (isRootTask()) {
// 構造子任務
List<SubTask> subTaskList = Lists.newLinkedList();
SubTask subTask=new SubTask();
subTask.setSiteId(1L);
subTask.setName("iron.guo");
subTaskList.add(subTask);
/*
* 子任務的構造由開發者自己定義
* eg. 現在需要從文件中讀取100W個ID,并處理數據庫中這些ID對應的數據,那么步驟如下:
* 1. 根任務(RootTask)讀取文件,流式拉取100W個ID,并按1000個一批的大小組裝成子任務進行派發
* 2. 非根任務獲取子任務,完成業務邏輯的處理
*/
// 調用 map 方法,派發子任務(map 可能會失敗并拋出異常,做好業務操作)
map(subTaskList, "DATA_PROCESS_TASK");
omsLogger.info("執行根任務-派發子任務");
return new ProcessResult(true, "ROOT_PROCESS_SUCCESS");
}
// 非子任務,可根據 subTask 的類型 或 TaskName 來判斷分支
if (context.getSubTask() instanceof SubTask) {
omsLogger.info("執行子任務開始");
omsLogger.info("Get from SubTask : name is {} and id is {}",((SubTask) context.getSubTask()).getName(),((SubTask) context.getSubTask()).getSiteId());
// 執行子任務,注:子任務人可以 map 產生新的子任務,可以構建任意級的 MapReduce 處理器
return new ProcessResult(true, "PROCESS_SUB_TASK_SUCCESS");
}
return new ProcessResult(false, "UNKNOWN_BUG");
}
@Override
public ProcessResult reduce(TaskContext taskContext, List<TaskResult> taskResults) {
// 所有 Task 執行結束后,reduce 將會被執行
// taskResults 保存了所有子任務的執行結果
// 用法舉例,統計執行結果
AtomicLong successCnt = new AtomicLong(0);
taskResults.forEach(tr -> {
if (tr.isSuccess()) {
successCnt.incrementAndGet();
}
});
// 該結果將作為任務最終的執行結果
return new ProcessResult(true, "success task num:" + successCnt.get());
}
// 自定義的子任務
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
private static class SubTask {
private Long siteId;
private String name;
}
}
執行結果
注:Map 處理器相當于 MapReduce 處理器的閹割版本(閹割了 reduce 方法),此處不再單獨舉例。
工作流
點擊右上角按鈕 新建工作流,即可錄入新的工作流,具體界面和說明如下所示。
- 工作流名稱:名稱,無實際業務用途,請盡量精簡字段
- 工作流描述:描述,無實際業務用途,請盡量精簡字段
- 定時信息:該工作流的觸發方式的觸發方式,包含時間表達式類型選擇框和時間表達式輸入框
CRON -> 填寫 CRON 表達式(在線生成網站)
API -> 不需要填寫任何參數,表明該任務由 OpenAPI 觸發
- 生命周期:定時策略生效的時間段
- 最大實例:該工作流同時執行的數量
- 任務依賴關系:提供編輯界面可視化操作,繪制 DAG(有向無環圖),配置工作流內各個任務的依賴關系
DAG 操作指南
編輯依賴關系
v4.0.0 以后支持節點的自由拖拉拽,不用再點點點了,哈哈哈 ~
- 添加節點:點擊 DAG 編輯框左上方的 “導入任務”,導入當前存在的任務(需要提前在 任務管理界面 錄入任務),生成 DAG 的節點
- 連接節點:點擊起始節點的任意一個錨點摁住不放,拖動鼠標連接到另一個節點的任意一個錨點即可
- 刪除節點:選中需要刪除的節點,按退格鍵( 注意:windows 下使用退格鍵 [Backspace],macOS 下使用刪除鍵 [delete] )
- 刪除邊:選中需要刪除的邊,按退格鍵( 注意:windows 下使用退格鍵 [Backspace],macOS 下使用刪除鍵 [delete] )
導入任務節點
任務為之前創建的任務,可用工作流形式串聯起來執行。
編輯節點信息
點擊需要編輯的節點,在右側會彈出一個編輯框,如下圖所示
- 任務名稱:當前節點引用的任務名稱,點擊可編輯(支持輸入名稱進行模糊搜索)
- 節點名稱:節點的名稱,無實際業務用途,在能明確表示節點背后的業務邏輯的情況下請盡量精簡字段
- 節點參數:節點的參數配置,當這個信息不為空的時候使用這個參數覆蓋當前節點所引用的任務所配置的參數信息
- 是否啟用:未啟用的節點將會直接跳過
- 失敗跳過:當這個節點執行失敗的時候不會打斷整個工作流的執行
特殊節點說明
判斷節點
判斷節點 不允許失敗跳過以及禁用,節點參數中存儲的是 Groovy 代碼(執行 Groovy 代碼時會將當前工作流上下文作為 context 變量注入到代碼執行的上下文中),其執行結果僅能返回 "true" 或者 "false",同時判斷節點僅有且必須有兩條“輸出”路徑。會根據該代碼的執行結果決定下游需要執行的節點。這里處理的原則是, 僅 cancel 那些只能通過被 disable 掉的邊可達的節點
舉個兩個栗子,灰色代表相應的邊 或者 節點被 disable 或 cancel,菱形代表判斷節點,假定執行結果為 true
case 3 以及 case 4 中的節點 3 都會被 cancel ,因為它只能通過節點 1 -> 節點 3 的邊可達(該邊的屬性為 false),但對于節點 5 而言,在 case 4 中因為判斷節點 2 的執行結果為 true ,那么其可以通過節點 2 -> 節點 5 的邊可達,所以不會被 disable 。
備注:如果需要根據上游節點的執行結果決定下游節點,可以將上游節點的執行結果注入上下文中,再在判斷節點中做相應的判斷。
工作流嵌套節點
該節點代表對某個工作流的引用,節點的 jobId 屬性存儲的是工作流 id,其他屬性和普通的任務節點一致。不允許出現循環引用以及多級嵌套的情況,即嵌套節點中指向的工作流一定是一個不含嵌套節點的工作流。
執行到該節點時,如果該節點處于啟用狀態,那么將啟動該節點所引用工作流的一個新實例,待該實例執行完成后再同步更新該節點的狀態。
注意,創建子工作流時,會透傳當前的上下文作為工作流的實例參數,在子工作流執行完成時會合并子工作流的上下文至父工作流的上下文中。
重試子工作流不會聯動重試父工作流,但失敗的子工作流會隨著父工作流的重試而原地重試(不會生成新的實例)