定時(shí)任務(wù)是每個(gè)業(yè)務(wù)常見的需求,比如每分鐘掃描超時(shí)支付的訂單,每小時(shí)清理一次數(shù)據(jù)庫歷史數(shù)據(jù),每天統(tǒng)計(jì)前一天的數(shù)據(jù)并生成報(bào)表等等。
01
JAVA 中自帶的解決方案
Cloud Native
1
使用 Timer
創(chuàng)建 java.util.TimerTask 任務(wù),在 run 方法中實(shí)現(xiàn)業(yè)務(wù)邏輯。通過 java.util.Timer 進(jìn)行調(diào)度,支持按照固定頻率執(zhí)行。所有的 TimerTask 是在同一個(gè)線程中串行執(zhí)行,相互影響。也就是說,對(duì)于同一個(gè) Timer 里的多個(gè) TimerTask 任務(wù),如果一個(gè) TimerTask 任務(wù)在執(zhí)行中,其它 TimerTask 即使到達(dá)執(zhí)行的時(shí)間,也只能排隊(duì)等待。如果有異常產(chǎn)生,線程將退出,整個(gè)定時(shí)任務(wù)就失敗。
import java.util.Timer;
import java.util.TimerTask;
public class TestTimerTask {
public static void main(String[] args) {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("hell world");
}
};
Timer timer = new Timer();
timer.schedule(timerTask, 10, 3000);
}
}
復(fù)制代碼
2
使用 ScheduledExecutorService
基于線程池設(shè)計(jì)的定時(shí)任務(wù)解決方案,每個(gè)調(diào)度任務(wù)都會(huì)分配到線程池中的一個(gè)線程去執(zhí)行,解決 Timer 定時(shí)器無法并發(fā)執(zhí)行的問題,支持 fixedRate 和 fixedDelay。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TestTimerTask {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
//按照固定頻率執(zhí)行,每隔5秒跑一次
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("hello fixedRate");
}
}, 0, 5, TimeUnit.SECONDS);
//按照固定延時(shí)執(zhí)行,上次執(zhí)行完后隔3秒再跑
ses.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("hello fixedDelay");
}
}, 0, 3, TimeUnit.SECONDS);
}
}
復(fù)制代碼
02
Spring 中自帶的解決方案
Cloud Native
Springboot 中提供了一套輕量級(jí)的定時(shí)任務(wù)工具 Spring Task,通過注解可以很方便的配置,支持 cron 表達(dá)式、fixedRate、fixedDelay。
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class MyTask {
/**
* 每分鐘的第30秒跑一次
*/
@Scheduled(cron = "30 * * * * ?")
public void task1() throws InterruptedException {
System.out.println("hello cron");
}
/**
* 每隔5秒跑一次
*/
@Scheduled(fixedRate = 5000)
public void task2() throws InterruptedException {
System.out.println("hello fixedRate");
}
/**
* 上次跑完隔3秒再跑
*/
@Scheduled(fixedDelay = 3000)
public void task3() throws InterruptedException {
System.out.println("hello fixedDelay");
}
}
復(fù)制代碼
Spring Task 相對(duì)于上面提到的兩種解決方案,最大的優(yōu)勢(shì)就是支持 cron 表達(dá)式,可以處理按照標(biāo)準(zhǔn)時(shí)間固定周期執(zhí)行的業(yè)務(wù),比如每天幾點(diǎn)幾分執(zhí)行。
03
業(yè)務(wù)冪等解決方案
Cloud Native
現(xiàn)在的應(yīng)用基本都是分布式部署,所有機(jī)器的代碼都是一樣的,前面介紹的 Java 和 Spring 自帶的解決方案,都是進(jìn)程級(jí)別的,每臺(tái)機(jī)器在同一時(shí)間點(diǎn)都會(huì)執(zhí)行定時(shí)任務(wù)。這樣會(huì)導(dǎo)致需要業(yè)務(wù)冪等的定時(shí)任務(wù)業(yè)務(wù)有問題,比如每月定時(shí)給用戶推送消息,就會(huì)推送多次。
于是,很多應(yīng)用很自然的就想到了使用分布式鎖的解決方案。即每次定時(shí)任務(wù)執(zhí)行之前,先去搶鎖,搶到鎖的執(zhí)行任務(wù),搶不到鎖的不執(zhí)行。怎么搶鎖,又是五花八門,比如使用 DB、zookeeper、redis。
1
使用 DB 或者 Zookeeper 搶鎖
使用 DB 或者 Zookeeper 搶鎖的架構(gòu)差不多,原理如下:
- 定時(shí)時(shí)間到了,在回調(diào)方法里,先去搶鎖。
- 搶到鎖,則繼續(xù)執(zhí)行方法,沒搶到鎖直接返回。
- 執(zhí)行完方法后,釋放鎖。
示例代碼如下:
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class MyTask {
/**
* 每分鐘的第30秒跑一次
*/
@Scheduled(cron = "30 * * * * ?")
public void task1() throws Exception {
String lockName = "task1";
if (tryLock(lockName)) {
System.out.println("hello cron");
releaseLock(lockName);
} else {
return;
}
}
private boolean tryLock(String lockName) {
//TODO
return true;
}
private void releaseLock(String lockName) {
//TODO
}
}
復(fù)制代碼
當(dāng)前的這個(gè)設(shè)計(jì),仔細(xì)一點(diǎn)的同學(xué)可以發(fā)現(xiàn),其實(shí)還是有可能導(dǎo)致任務(wù)重復(fù)執(zhí)行的。比如任務(wù)執(zhí)行的非常快,A 這臺(tái)機(jī)器搶到鎖,執(zhí)行完任務(wù)后很快就釋放鎖了。B 這臺(tái)機(jī)器后搶鎖,還是會(huì)搶到鎖,再執(zhí)行一遍任務(wù)。
2
使用 redis 搶鎖
使用 redis 搶鎖,其實(shí)架構(gòu)上和 DB/zookeeper 差不多,不過 redis 搶鎖支持過期時(shí)間,不用主動(dòng)去釋放鎖,并且可以充分利用這個(gè)過期時(shí)間,解決任務(wù)執(zhí)行過快釋放鎖導(dǎo)致任務(wù)重復(fù)執(zhí)行的問題,架構(gòu)如下:
示例代碼如下:
@Component
@EnableScheduling
public class MyTask {
/**
* 每分鐘的第30秒跑一次
*/
@Scheduled(cron = "30 * * * * ?")
public void task1() throws InterruptedException {
String lockName = "task1";
if (tryLock(lockName, 30)) {
System.out.println("hello cron");
releaseLock(lockName);
} else {
return;
}
}
private boolean tryLock(String lockName, long expiredTime) {
//TODO
return true;
}
private void releaseLock(String lockName) {
//TODO
}
}
復(fù)制代碼
看到這里,可能又會(huì)有同學(xué)有問題,加一個(gè)過期時(shí)間是不是還是不夠嚴(yán)謹(jǐn),還是有可能任務(wù)重復(fù)執(zhí)行?
——的確是的,如果有一臺(tái)機(jī)器突然長(zhǎng)時(shí)間的 fullgc,或者之前的任務(wù)還沒處理完(Spring Task 和 ScheduledExecutorService 本質(zhì)還是通過線程池處理任務(wù)),還是有可能隔了 30 秒再去調(diào)度任務(wù)的。
3
使用 Quartz
Quartz [ 1] 是一套輕量級(jí)的任務(wù)調(diào)度框架,只需要定義了 Job(任務(wù)),Trigger(觸發(fā)器)和 Scheduler(調(diào)度器),即可實(shí)現(xiàn)一個(gè)定時(shí)調(diào)度能力。支持基于數(shù)據(jù)庫的集群模式,可以做到任務(wù)冪等執(zhí)行。
Quartz 支持任務(wù)冪等執(zhí)行,其實(shí)理論上還是搶 DB 鎖,我們看下 quartz 的表結(jié)構(gòu):
其中,QRTZ_LOCKS 就是 Quartz 集群實(shí)現(xiàn)同步機(jī)制的行鎖表,其表結(jié)構(gòu)如下:
--QRTZ_LOCKS表結(jié)構(gòu)
CREATE TABLE `QRTZ_LOCKS` (
`LOCK_NAME` varchar(40) NOT NULL,
PRIMARY KEY (`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--QRTZ_LOCKS記錄
+-----------------+
| LOCK_NAME |
+-----------------+
| CALENDAR_ACCESS |
| JOB_ACCESS |
| MISFIRE_ACCESS |
| STATE_ACCESS |
| TRIGGER_ACCESS |
+-----------------+
復(fù)制代碼
可以看出 QRTZ_LOCKS 中有 5 條記錄,代表 5 把鎖,分別用于實(shí)現(xiàn)多個(gè) Quartz Node 對(duì) Job、Trigger、Calendar 訪問的同步控制。
04
開源任務(wù)調(diào)度中間件
Cloud Native
上面提到的解決方案,在架構(gòu)上都有一個(gè)問題,那就是每次調(diào)度都需要搶鎖,特別是使用 DB 和 Zookeeper 搶鎖,性能會(huì)比較差,一旦任務(wù)量增加到一定的量,就會(huì)有比較明顯的調(diào)度延時(shí)。還有一個(gè)痛點(diǎn),就是業(yè)務(wù)想要修改調(diào)度配置,或者增加一個(gè)任務(wù),得修改代碼重新發(fā)布應(yīng)用。
于是開源社區(qū)涌現(xiàn)了一堆任務(wù)調(diào)度中間件,通過任務(wù)調(diào)度系統(tǒng)進(jìn)行任務(wù)的創(chuàng)建、修改和調(diào)度,這其中國內(nèi)最火的就是 XXL-JOB 和 ElasticJob。
1
ElasticJob
ElasticJob [ 2] 是一款基于 Quartz 開發(fā),依賴 Zookeeper 作為注冊(cè)中心、輕量級(jí)、無中心化的分布式任務(wù)調(diào)度框架,目前已經(jīng)通過 Apache 開源。
ElasticJob 相對(duì)于 Quartz 來說,從功能上最大的區(qū)別就是支持分片,可以將一個(gè)任務(wù)分片參數(shù)分發(fā)給不同的機(jī)器執(zhí)行。架構(gòu)上最大的區(qū)別就是使用 Zookeeper 作為注冊(cè)中心,不同的任務(wù)分配給不同的節(jié)點(diǎn)調(diào)度,不需要搶鎖觸發(fā),性能上比 Quartz 上強(qiáng)大很多,架構(gòu)圖如下:
開發(fā)上也比較簡(jiǎn)單,和 springboot 結(jié)合比較好,可以在配置文件定義任務(wù)如下:
elasticjob:
regCenter:
serverLists: localhost:2181
namespace: elasticjob-lite-springboot
jobs:
simpleJob:
elasticJobClass: org.apache.shardingsphere.elasticjob.lite.example.job.SpringBootSimpleJob
cron: 0/5 * * * * ?
timeZone: GMT+08:00
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
scriptJob:
elasticJobType: SCRIPT
cron: 0/10 * * * * ?
shardingTotalCount: 3
props:
script.command.line: "echo SCRIPT Job: "
manualScriptJob:
elasticJobType: SCRIPT
jobBootstrapBeanName: manualScriptJobBean
shardingTotalCount: 9
props:
script.command.line: "echo Manual SCRIPT Job: "
復(fù)制代碼
實(shí)現(xiàn)任務(wù)接口如下:
@Component
public class SpringBootShardingJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println("分片總數(shù)="+context.getShardingTotalCount() + ", 分片號(hào)="+context.getShardingItem()
+ ", 分片參數(shù)="+context.getShardingParameter());
}
}
復(fù)制代碼
運(yùn)行結(jié)果如下:
分片總數(shù)=3, 分片號(hào)=0, 分片參數(shù)=Beijing
分片總數(shù)=3, 分片號(hào)=1, 分片參數(shù)=Shanghai
分片總數(shù)=3, 分片號(hào)=2, 分片參數(shù)=Guangzhou
復(fù)制代碼
同時(shí),ElasticJob 還提供了一個(gè)簡(jiǎn)單的 UI,可以查看任務(wù)的列表,同時(shí)支持修改、觸發(fā)、停止、生效、失效操作。
遺憾的是,ElasticJob 暫不支持動(dòng)態(tài)創(chuàng)建任務(wù)。
2
XXL-JOB
XXL-JOB [ 3] 是一個(gè)開箱即用的輕量級(jí)分布式任務(wù)調(diào)度系統(tǒng),其核心設(shè)計(jì)目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡(jiǎn)單、輕量級(jí)、易擴(kuò)展,在開源社區(qū)廣泛流行。
XXL-JOB 是 Master-Slave 架構(gòu),Master 負(fù)責(zé)任務(wù)的調(diào)度,Slave 負(fù)責(zé)任務(wù)的執(zhí)行,架構(gòu)圖如下:
XXL-JOB 接入也很方便,不同于 ElasticJob 定義任務(wù)實(shí)現(xiàn)類,是通過@XxlJob 注解定義 JobHandler。
@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
/**
* 1、簡(jiǎn)單任務(wù)示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}
/**
* 2、分片廣播任務(wù)
*/
@XxlJob("shardingJobHandler")
public ReturnT<String> shardingJobHandler(String param) throws Exception {
// 分片參數(shù)
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片參數(shù):當(dāng)前分片序號(hào) = {}, 總分片數(shù) = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 業(yè)務(wù)邏輯
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return ReturnT.SUCCESS;
}
}
復(fù)制代碼
XXL-JOB 相較于 ElasticJob,最大的特點(diǎn)就是功能比較豐富,可運(yùn)維能力比較強(qiáng),不但支持控制臺(tái)動(dòng)態(tài)創(chuàng)建任務(wù),還有調(diào)度日志、運(yùn)行報(bào)表等功能。
XXL-JOB 的歷史記錄、運(yùn)行報(bào)表和調(diào)度日志,都是基于數(shù)據(jù)庫實(shí)現(xiàn)的:
由此可以看出,XXL-JOB 所有功能都依賴數(shù)據(jù)庫,且調(diào)度中心不支持分布式架構(gòu),在任務(wù)量和調(diào)度量比較大的情況下,會(huì)有性能瓶頸。不過如果對(duì)任務(wù)量級(jí)、高可用、監(jiān)控報(bào)警、可視化等沒有過高要求的話,XXL-JOB 基本可以滿足定時(shí)任務(wù)的需求。
05
企業(yè)級(jí)解決方案
Cloud Native
開源軟件只能提供基礎(chǔ)的調(diào)度能力,在監(jiān)管控上的能力一般都比較弱。比如日志服務(wù),業(yè)界往往使用 ELK 解決方案;短信報(bào)警,需要有短信平臺(tái);監(jiān)控大盤,現(xiàn)在主流的解決方案是 Prometheus;等等。企業(yè)想要有這些能力,不但需要額外的開發(fā)成本,還需要昂貴的資源成本。
另外使用開源軟件也伴隨著穩(wěn)定性的風(fēng)險(xiǎn),就是出了問題沒人能處理,想要反饋到社區(qū)等社區(qū)處理,這個(gè)鏈路太長(zhǎng)了,早就產(chǎn)生故障了。
阿里云任務(wù)調(diào)度 SchedulerX [ 4] 是阿里巴巴自研的基于 Akka 架構(gòu)的一站式任務(wù)調(diào)度平臺(tái),兼容開源 XXL-JOB、ElasticJob、Quartz(規(guī)劃中),支持 Cron 定時(shí)、一次性任務(wù)、任務(wù)編排、分布式跑批,具有高可用、可視化、可運(yùn)維、低延時(shí)等能力,自帶企業(yè)級(jí)監(jiān)控大盤、日志服務(wù)、短信報(bào)警等服務(wù)。
1
優(yōu)勢(shì)
安全防護(hù)
- 多層次安全防護(hù):支持 HTTPS 和 VPC 訪問,同時(shí)還有阿里云的多層安全防護(hù),防止惡意攻擊。
- 多租戶隔離機(jī)制:支持多地域、命名空間和應(yīng)用級(jí)別的隔離。
- 權(quán)限管控:支持控制臺(tái)讀寫的權(quán)限管理,客戶端接入的鑒權(quán)。
企業(yè)級(jí)高可用
SchedulerX2.0 采用高可用架構(gòu),任務(wù)多備份機(jī)制,經(jīng)歷阿里集團(tuán)多年雙十一、容災(zāi)演練,可以做到任意一個(gè)機(jī)房掛了,任務(wù)調(diào)度都不會(huì)收到影響。
商業(yè)級(jí)報(bào)警運(yùn)維
- 報(bào)警:支持郵件、釘釘、短信、電話,(其他報(bào)警方式在規(guī)劃中)。支持任務(wù)失敗、超時(shí)、無可用機(jī)器報(bào)警。報(bào)警內(nèi)容可以直接看出任務(wù)失敗的原因,以釘釘機(jī)器人為例。
- 運(yùn)維操作:原地重跑、重刷數(shù)據(jù)、標(biāo)記成功、查看堆棧、停止任務(wù)、指定機(jī)器等。
豐富的可視化
schedulerx 擁有豐富的可視化能力,比如:
- 用戶大盤
- 查看任務(wù)歷史執(zhí)行記錄
- 查看任務(wù)運(yùn)行日志
- 查看任務(wù)運(yùn)行堆棧
- 查看任務(wù)操作記錄
兼容開源
Schedulerx 兼容開源 XXL-JOB、ElasticJob、Quartz(規(guī)劃中),業(yè)務(wù)不需要改一行代碼,即可以將任務(wù)托管在 SchedulerX 調(diào)度平臺(tái),享有企業(yè)級(jí)可視化和報(bào)警的能力。
Spring 原生
SchedulerX 支持通過控制臺(tái)和 API 動(dòng)態(tài)創(chuàng)建任務(wù),也支持 Spring 聲明式任務(wù)定義,一份任務(wù)配置可以拿到任何環(huán)境一鍵啟動(dòng),配置如下:
spring:
schedulerx2:
endpoint: acm.aliyun.com #請(qǐng)?zhí)顚懖煌瑀egin的endpoint
namespace: 433d8b23-06e9-xxxx-xxxx-90d4d1b9a4af #region內(nèi)全局唯一,建議使用UUID生成
namespaceName: 學(xué)仁測(cè)試
AppName: myTest
groupId: myTest.group #同一個(gè)命名空間下需要唯一
appKey: myTest123@alibaba #應(yīng)用的key,不要太簡(jiǎn)單,注意保管好
regionId: public #填寫對(duì)應(yīng)的regionId
aliyunAccessKey: xxxxxxx #阿里云賬號(hào)的ak
aliyunSecretKey: xxxxxxx #阿里云賬號(hào)的sk
alarmChannel: sms,ding #報(bào)警通道:短信和釘釘
jobs:
simpleJob:
jobModel: standalone
className: com.aliyun.schedulerx.example.processor.SimpleJob
cron: 0/30 * * * * ? # cron表達(dá)式
jobParameter: hello
overwrite: true
shardingJob:
jobModel: sharding
className: ccom.aliyun.schedulerx.example.processor.ShardingJob
.NETime: 2022-06-02 12:00:00 # 一次性任務(wù)表達(dá)式
jobParameter: 0=Beijing,1=Shanghai,2=Guangzhou
overwrite: true
broadcastJob: # 不填寫cron和oneTime,表示api任務(wù)
jobModel: broadcast
className: com.aliyun.schedulerx.example.processor.BroadcastJob
jobParameter: hello
overwrite: true
mapReduceJob:
jobModel: mapreduce
className: com.aliyun.schedulerx.example.processor.MapReduceJob
cron: 0 * * * * ?
jobParameter: 100
overwrite: true
alarmUsers: #報(bào)警聯(lián)系人
user1:
userName: 張三
userPhone: 12345678900
user2:
userName: 李四
ding: https://oapi.dingtalk.com/robot/send?access_token=xxxxx
復(fù)制代碼
分布式跑批
SchedulerX 提供了豐富的分布式模型,可以處理各種各樣的分布式業(yè)務(wù)場(chǎng)景。包括單機(jī)、廣播、分片、 MapReduce [ 5] 等,架構(gòu)如下:
SchedulerX 的 MapReduce 模型,簡(jiǎn)單幾行代碼,就可以將海量任務(wù)分布式到多臺(tái)機(jī)器跑批,相對(duì)于大數(shù)據(jù)跑批來說,具有速度快、數(shù)據(jù)安全、成本低、簡(jiǎn)單易學(xué)等特點(diǎn)。
任務(wù)編排
SchedulerX 通過工作流進(jìn)行任務(wù)編排,并且提供了一個(gè)可視化的界面,操作簡(jiǎn)單,拖拖拽拽即可配置一個(gè)工作流。詳細(xì)的任務(wù)狀態(tài)圖能一目了然看到下游任務(wù)為什么沒跑,方便定位問題。
可搶占的任務(wù)優(yōu)先級(jí)隊(duì)列
常見場(chǎng)景是夜間離線報(bào)表業(yè)務(wù),比如很多報(bào)表任務(wù)是晚上 1、2 點(diǎn)開始跑,要控制應(yīng)用最大并發(fā)的任務(wù)數(shù)量(否則業(yè)務(wù)扛不住),達(dá)到并發(fā)上限的任務(wù)會(huì)在隊(duì)列中等待。同時(shí)要求早上 9 點(diǎn)前必須把 KPI 報(bào)表跑出來,可以設(shè)置 KPI 任務(wù)高優(yōu)先級(jí),會(huì)搶占低優(yōu)先級(jí)任務(wù)優(yōu)先調(diào)度。
SchedulerX 支持可搶占的任務(wù)優(yōu)先級(jí)隊(duì)列,可以在控制臺(tái)動(dòng)態(tài)配置:
2
Q&A
- Kubernetes 應(yīng)用可以接入 SchedulerX 嗎?
——可以的,無論是物理機(jī)、容器、還是 Kubernetes pod,都可以接入 SchedulerX。
- 我的應(yīng)用不在阿里云上,可否使用 SchedulerX?
——可以的,任何云平臺(tái)或者本地機(jī)器,只要能訪問公網(wǎng),都可以接入 SchedulerX。
作者:絕不禿頂?shù)某绦蛟?br />鏈接:
https://juejin.cn/post/7127551400824799263