1. 概述
Spring Cloud Task的目標是為Spring Boot應用程序提供創建短運行期微服務的功能。在Spring Cloud Task中,我們可以靈活地動態運行任何任務,按需分配資源并在任務完成后檢索結果。Tasks是Spring Cloud Data Flow中的一個基礎項目,允許用戶將幾乎任何Spring Boot應用程序作為一個短期任務執行。
2. 一個簡單的任務應用程序
2.1 添加相關依賴項
首先,我們可以添加具有
spring-cloud-task-dependencies的依賴關系管理部分:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-task-dependencies</artifactId>
<version>1.2.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
此依賴關系管理通過import范圍管理具有依賴關系的版本。
我們需要添加以下依賴項:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-task-core</artifactId>
</dependency>
現在,要啟動我們的Spring Boot應用程序,我們需要與相關父級一起使用spring-boot-starter。
我們將使用Spring Data JPA作為ORM工具,因此我們還需要為其添加依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
2.2 @EnableTask注解
要引導Spring Cloud Task的功能,我們需要添加@EnableTask注解:
@SpringBootApplication
@EnableTask
public class TaskDemo {
// ...
}
該注解在程序中引入了SimpleTaskConfiguration類,該類依次注冊TaskRepository及其基礎結構。默認情況下,內存映射用于存儲TaskRepository的狀態。
TaskRepository的主要信息在TaskExecution類中建模。該類的包含的字段有taskName,startTime,endTime,exitMessage。exitMessage在退出的時候存儲一些有用信息。
如果退出是由應用程序的任何事件中的失敗引起的,則完整的異常堆棧跟蹤將存儲在此處。
Spring Boot提供了一個接口ExitCodeExceptionMapper,它將未捕獲的異常映射到允許經過詳細調試的退出代碼。Cloud Task將信息存儲在數據源中以供將來分析。
2.3 為TaskRepository配置DataSource
一旦任務結束,存儲TaskRepository的內存映射將消失,我們將丟失與Task事件相關的數據。要想永久存儲,我們將使用MySQL作為Spring Data JPA的數據源。
數據源 在application.yml文件中配置。要配置Spring Cloud Task以使用提供的數據源作為TaskRepository的存儲,我們需要創建一個擴展DefaultTaskConfigurer的類。
現在,我們可以將配置的Datasource作為構造函數參數發送到超類的構造函數:
public class HelloWorldTaskConfigurer extends DefaultTaskConfigurer{
public HelloWorldTaskConfigurer(DataSource dataSource){
super(dataSource);
}
}
為了實現上述配置,我們需要使用@Autowired批注注釋DataSource的實例,并將實例注入上面定義的HelloWorldTaskConfigurer bean的構造參數中 :
@Autowired
private DataSource dataSource;
@Bean
public HelloWorldTaskConfigurer getTaskConfigurer() {
return new HelloWorldTaskConfigurer(dataSource);
}
這樣就完成了將TaskRepository存儲到MySQL數據庫的配置。
2.4 實現
在Spring Boot中,我們可以在應用程序完成啟動之前執行任何任務。我們可以使用ApplicationRunner或CommandLineRunner接口來創建一個簡單的Task。
我們需要實現這些接口的run方法,并將實現類聲明為bean:
@Component
public static class HelloWorldApplicationRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments arg0) throws Exception {
// TODO Auto-generated method stub
LOGGER.info("Hello World from Spring Cloud Task!");
}
}
3. Spring Cloud Task的生命周期
首先,我們在TaskRepository中創建一個條目。這表明所有bean都已準備好在Application中使用,并且Runner接口的run方法已準備好執行。
完成run方法的執行或ApplicationContext事件的任何失敗后,TaskRepository將使用另一個條目進行更新。
在任務生命周期中,我們可以在TaskExecutionListener接口中注冊偵聽器。我們需要一個實現接口的類,它有三個方法 - 在Task的各個事件中觸發onTaskEnd,onTaksFailed和onTaskStartup。
public class TaskListener implements TaskExecutionListener {
private final static Logger LOGGER = Logger.getLogger(TaskListener.class.getName());
@Override
public void onTaskEnd(TaskExecution arg0) {
// TODO Auto-generated method stub
LOGGER.info("End of Task");
}
@Override
public void onTaskFailed(TaskExecution arg0, Throwable arg1) {
// TODO Auto-generated method stub
}
@Override
public void onTaskStartup(TaskExecution arg0) {
// TODO Auto-generated method stub
LOGGER.info("Task Startup");
}
}
我們需要在TaskDemo類中聲明實現類的bean :
@Bean
public TaskListener taskListener() {
return new TaskListener();
}
運行TaskDemo類,在控制臺可看到任務被執行:
14:23:29.974 [main] INFO o.s.j.e.a.AnnotationMBeanExporter - Registering beans for JMX exposure on startup
14:23:29.978 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 0
14:23:30.103 [main] INFO c.p.spring.cloud.task.TaskListener - Task Startup
14:23:30.109 [main] INFO c.p.spring.cloud.task.TaskDemo - Hello World from Spring Cloud Task!
14:23:30.113 [main] INFO c.p.spring.cloud.task.TaskListener - End of Task
14:23:30.126 [main] INFO o.s.c.a.AnnotationConfigApplicationContext - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2a798d51: startup date [Fri Oct 12 14:23:28 CST 2018]; root of context hierarchy
14:23:30.127 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Stopping beans in phase 0
14:23:30.128 [main] INFO o.s.j.e.a.AnnotationMBeanExporter - Unregistering JMX-exposed beans on shutdown
14:23:30.128 [main] INFO o.s.o.j.LocalContainerEntityManagerFactoryBean - Closing JPA EntityManagerFactory for persistence unit 'default'
14:23:30.129 [main] INFO o.h.tool.hbm2ddl.SchemaExport - HHH000227: Running hbm2ddl schema export
14:23:30.129 [main] INFO o.h.tool.hbm2ddl.SchemaExport - HHH000230: Schema export complete
14:23:30.132 [main] INFO c.p.spring.cloud.task.TaskDemo - Started TaskDemo in 2.405 seconds (JVM running for 2.771)
4. 與Spring Batch集成
我們可以將Spring Batch Job作為Task執行,并使用Spring Cloud Task記錄Job執行的事件。要啟用此功能,我們需要添加與Boot和Cloud相關的Batch依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-task-batch</artifactId>
</dependency>
要將作業配置為任務,我們需要在JobConfiguration類中注冊Job Bean :
@Bean
public Job job2() {
return jobBuilderFactory.get("job2").start(stepBuilderFactory.get("job2step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
LOGGER.info("This job is from PeterWanghao");
return RepeatStatus.FINISHED;
}
}).build()).build();
}
我們需要使用@EnableBatchProcessing注解來裝飾TaskDemo類:
@SpringBootApplication
@EnableTask
@EnableBatchProcessing
public class TaskDemo {
// ...
}
該@EnableBatchProcessing注解集成了Spring Batch的功能,并設置批處理作業所需的基本配置。
現在,如果我們運行應用程序,@ EnableBatchProcessing注釋將觸發Spring Batch Job執行,Spring Cloud Task將使用springcloud數據庫記錄所有批處理作業的執行事件。
運行TaskDemo類,在控制臺可看到任務被執行:
14:30:26.003 [main] INFO o.s.j.e.a.AnnotationMBeanExporter - Registering beans for JMX exposure on startup
14:30:26.008 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 0
14:30:26.047 [main] INFO c.p.spring.cloud.task.TaskListener - Task Startup
14:30:26.053 [main] INFO c.p.spring.cloud.task.TaskDemo - Hello World from Spring Cloud Task!
14:30:26.054 [main] INFO o.s.b.a.b.JobLauncherCommandLineRunner - Running default command line with: []
14:30:26.257 [main] INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=job1]] launched with the following parameters: [{}]
14:30:26.266 [main] INFO o.s.c.t.b.l.TaskBatchExecutionListener - The job execution id 1 was run within the task execution 4
14:30:26.292 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [job1step1]
14:30:26.312 [main] INFO c.p.s.cloud.task.JobConfiguration - Tasklet has run
14:30:26.342 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [job1step2]
14:30:26.353 [main] INFO c.p.s.cloud.task.JobConfiguration - Processing of chunks
14:30:26.353 [main] INFO c.p.s.cloud.task.JobConfiguration - Processing of chunks
14:30:26.353 [main] INFO c.p.s.cloud.task.JobConfiguration - Processing of chunks
14:30:26.354 [main] INFO c.p.s.cloud.task.JobConfiguration - >> -7
14:30:26.354 [main] INFO c.p.s.cloud.task.JobConfiguration - >> -2
14:30:26.354 [main] INFO c.p.s.cloud.task.JobConfiguration - >> -3
14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration - Processing of chunks
14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration - Processing of chunks
14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration - Processing of chunks
14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration - >> -10
14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration - >> -5
14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration - >> -6
14:30:26.381 [main] INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=job1]] completed with the following parameters: [{}] and the following status: [COMPLETED]
14:30:26.398 [main] INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=job2]] launched with the following parameters: [{}]
14:30:26.404 [main] INFO o.s.c.t.b.l.TaskBatchExecutionListener - The job execution id 2 was run within the task execution 4
14:30:26.420 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [job2step1]
14:30:26.428 [main] INFO c.p.s.cloud.task.JobConfiguration - This job is from PeterWanghao
14:30:26.441 [main] INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=job2]] completed with the following parameters: [{}] and the following status: [COMPLETED]
14:30:26.442 [main] INFO c.p.spring.cloud.task.TaskListener - End of Task
14:30:26.448 [main] INFO o.s.c.a.AnnotationConfigApplicationContext - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@399f45b1: startup date [Fri Oct 12 14:30:23 CST 2018]; root of context hierarchy
14:30:26.450 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Stopping beans in phase 0
14:30:26.450 [main] INFO o.s.j.e.a.AnnotationMBeanExporter - Unregistering JMX-exposed beans on shutdown
14:30:26.451 [main] INFO o.s.o.j.LocalContainerEntityManagerFactoryBean - Closing JPA EntityManagerFactory for persistence unit 'default'
14:30:26.451 [main] INFO o.h.tool.hbm2ddl.SchemaExport - HHH000227: Running hbm2ddl schema export
14:30:26.451 [main] INFO o.h.tool.hbm2ddl.SchemaExport - HHH000230: Schema export complete
14:30:26.455 [main] INFO c.p.spring.cloud.task.TaskDemo - Started TaskDemo in 3.746 seconds (JVM running for 4.093)
5. 從Stream啟動任務
我們可以從Spring Cloud Stream觸發任務。為了達到這個目的,我們使用@EnableTaskLaucnher注解。這一次,我們使用Spring Boot應用程序添加注釋,TaskSink將可用:
@SpringBootApplication
@EnableTaskLauncher
public class StreamTaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
TaskSink從一個流中接收消息,信息中包含GenericMessage含有TaskLaunchRequest作為有效負載。然后它觸發任務啟動請求中提供的基于任務的坐標。
要使TaskSink起作用,我們需要配置一個實現TaskLauncher接口的bean。出于測試目的,我們在這里Mock實現:
@Bean
public TaskLauncher taskLauncher() {
return mock(TaskLauncher.class);
}
我們需要注意的是,TaskLauncher接口僅在添加
spring-cloud-deployer-local依賴項后才可用:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-deployer-local</artifactId>
<version>1.3.1.RELEASE</version>
</dependency>
我們可以測試通過調用發起的任務是否輸入了Sink:
public class SpringCloudTaskSinkApplicationIntegrationTest{
@Autowired
private Sink sink;
//
}
現在,我們創建一個TaskLaunchRequest實例,并將其作為GenericMessage < TaskLaunchRequest >對象的有效負載發送。然后我們可以調用Sink的輸入通道,將GenericMessage對象保留在通道中。
六,結論
在本文中,我們探討了Spring Cloud Task的執行方式以及如何配置它以在數據庫中記錄其事件。我們還觀察了如何定義Spring Batch作業并將其存儲在TaskRepository中。最后,我們解釋了如何從Spring Cloud Stream中觸發Task。