為什么需要定時任務
定時任務的應用場景十分廣泛,如定時清理文件、定時生成報表、定時數據同步備份等。
JAVA定時任務的原理
jdk自帶的庫中,有兩種技術可以實現定時任務,一種是Timer,另一種是
ScheduledThreadPoolExecutor
Timer+TimerTask
Timer是一個線程,控制執行TimerTask所需要執行的內容
public class Timer {
/**
* The timer task queue. This data structure is shared with the timer
* thread. The timer produces tasks, via its various schedule calls,
* and the timer thread consumes, executing timer tasks as Appropriate,
* and removing them from the queue when they're obsolete.
*/
private final TaskQueue queue = new TaskQueue();
/**
* The timer thread.
*/
private final TimerThread thread = new TimerThread(queue);
。。。。。。
}
其中,需要注意,Timer類有幾個方法創建不同的線程執行:
延時執行
//其中的delay是延時時間,表示多少毫秒后執行一次task
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
sched(task, System.currentTimeMillis()+delay, 0);
}
指定時間點執行
//到達指定時間time的時候執行一次task
public void schedule(TimerTask task, Date time) {
sched(task, time.getTime(), 0);
}
延時周期執行
//經過delay毫秒后按每period毫秒執行一次的周期執行task
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
指定時間點后周期執行
//到達指定時間firstTime之后按照每period毫秒執行一次的周期執行task
public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}
TimerTask是一個實現了Runable接口的類,所以能夠放到線程去執行:
public abstract class TimerTask implements Runnable {
/**
* This object is used to control access to the TimerTask internals.
*/
final Object lock = new Object();
。。。。。。
}
示例:
public class JavaTimerJob {
public static void main(String[] args) {
Timer timer = new Timer();
Task task = new Task();
//當前時間開始,每1秒執行一次
timer.schedule(task, new Date(),1000);
}
}
class Task extends TimerTask {
@Override
public void run() {
System.out.println(new Date()+": This is my job...");
}
}
執行結果:
Tue May 30 13:45:47 CST 2022: This is my job...
Tue May 30 13:45:48 CST 2022: This is my job...
Tue May 30 13:45:49 CST 2022: This is my job...
Tue May 30 13:45:50 CST 2022: This is my job...
。。。。
弊端:Timer是單線程的,一旦定時任務中某一過程時刻拋出異常,將會導致整體線程停止,定時任務停止。
ScheduledThreadPoolExecutor
繼承了ThreadPoolExecutor,,是一個基于線程池的調度器 通過實現ScheduledExecutorService接口方法去實現任務調度,主要方法如下:
延時執行
//command是待執行的線程,delay表示延時時長,unit代表時間單位
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
延時周期執行
//command是待執行的線程,initialDelay表示延時時長,period代表執行間隔時長,unit代表時間單位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
每段延時間隔執行
//command是待執行的線程,initialDelay表示延時時長,delay代表每次執行線程前的延時時長,unit代表時間單位
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
示例:
public class JavaScheduledThreadPoolExecutor {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8);
//延時1秒后開始執行,每3秒執行一次
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(new Date()+": This is my job...");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
執行結果:
Tue May 30 15:05:16 CST 2022: This is my job...
Tue May 30 15:05:19 CST 2022: This is my job...
Tue May 30 15:05:22 CST 2022: This is my job...
Tue May 30 15:05:25 CST 2022: This is my job...
。。。。。
Timer VS ScheduledThreadPoolExecutor
Timer
- 是單線程,如果開啟多個線程服務,將會出現競爭,一旦出現異常,線程停止,定時任務停止;
- 兼容性更高,jdk1.3后使用
ScheduledThreadPoolExecutor
- 基于線程池實現多線程,且自動調整線程數,線程出錯并不會影響整體定時任務執行。
- 在jdk1.5后可使用
Spring定時任務
Spring原生定時任務主要依靠@Scheduled注解實現:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
String CRON_DISABLED = "-";
String cron() default ""; //類似于corn表達式,可以指定定時任務執行的延遲及周期規則
String zone() default ""; //指明解析cron表達式的時區。
long fixedDelay() default -1; //在最后一次調用結束和下一次調用開始之間以固定周期(以毫秒為單位)執行帶注解的方法。(要等待上次任務完成后)
String fixedDelayString() default ""; //同上面作用一樣,只是String類型
long fixedRate() default -1; //在調用之間以固定的周期(以毫秒為單位)執行帶注解的方法。(不需要等待上次任務完成)
String fixedRateString() default ""; //同上面作用一樣,只是String類型
long initialDelay() default -1; //第一次執行fixedRate()或fixedDelay()任務之前延遲的毫秒數 。
String initialDelayString() default ""; //同上面作用一樣,只是String類型
}
Spring靜態定時任務示例:
@Slf4j
@Component
public class TestJob {
//每40秒執行一次
@Scheduled(cron = "0/40 * * * * ?")
public void logJob(){
if(log.isDebugEnabled()){
log.debug("現在是:{}",LocalDateTime.now());
}
}
}
執行結果:
現在是:2022-05-30T16:03:40.006
現在是:2022-05-30T16:04
現在是:2022-05-30T16:04:40.003
@Scheduled定時任務原理(源碼)
①項目啟動掃描帶有注解@Scheduled的所有方法信息由
ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization方法實現功能:
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
//獲取定時任務的方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
//調用processScheduled方法將定時任務方法存放到任務隊列中
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
②調用processScheduled方法將定時任務方法存放到任務隊列中
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
//創建任務線程
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
//解析任務執行初始延遲
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value "" + initialDelayString + "" - cannot parse into long");
}
}
}
//解析cron表達式
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
//解析fixedDelay參數
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
//存放任務到任務隊列中
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value "" + fixedDelayString + "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
//解析fixedRate參數
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value "" + fixedRateString + "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// 斷言檢查
Assert.isTrue(processedSchedule, errorMessage);
//并發控制將任務隊列存入注冊任務列表
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
③將任務解析并添加到任務隊列后,交由ScheduledTaskRegistrar類的scheduleTasks方法添加(注冊)定時任務到環境中:
protected void scheduleTasks() {
if (this.taskScheduler == null) {
//獲取ScheduledExecutorService對象,實際上都是使用ScheduledThreadPoolExecutor執行定時任務調度
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
private void addScheduledTask(@Nullable ScheduledTask task) {
if (task != null) {
this.scheduledTasks.add(task);
}
}
由上述源碼可以看出,Spring原生定時任務的大概步驟如下: 1.掃描帶@Scheduled注解的類和方法(
ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization(........)) 2.將定時任務解析完成后加入任務隊列(ScheduledAnnotationBeanPostProcessor.processScheduled(........)) 3.將定時任務注冊到當前運行環境,等待執行(ScheduledTaskRegistrar.scheduleTasks(.......)) 且@Scheduled的底層調度實現是ScheduledThreadPoolExecutor
作者:MrDong先生
鏈接:
https://juejin.cn/post/7111992569034178574
來源:稀土掘金