環(huán)境:Springboot2.6.12 + Spring Batch4.2.7
Spring Batch是一個(gè)輕量級的,完全面向Spring的批處理框架,可以應(yīng)用于企業(yè)級大量的數(shù)據(jù)處理系統(tǒng)。Spring Batch以POJO和大家熟知的Spring框架為基礎(chǔ),使開發(fā)者更容易的訪問和利用企業(yè)級服務(wù)。Spring Batch可以提供大量的,可重復(fù)的數(shù)據(jù)處理功能,包括日志記錄/跟蹤,事務(wù)管理,作業(yè)處理統(tǒng)計(jì)工作重新啟動(dòng)、跳過,和資源管理等重要功能。
業(yè)務(wù)場景:
-
定期提交批處理。
-
并行批處理:作業(yè)的并行處理
-
分階段、企業(yè)消息驅(qū)動(dòng)的處理
-
大規(guī)模并行批處理
-
故障后手動(dòng)或計(jì)劃重新啟動(dòng)
-
相關(guān)步驟的順序處理(擴(kuò)展到工作流驅(qū)動(dòng)的批處理)
-
部分處理:跳過記錄(例如,回滾時(shí))
-
整批事務(wù),適用于小批量或現(xiàn)有存儲過程/腳本的情況
技術(shù)目標(biāo):
-
批處理開發(fā)人員使用Spring編程模型:專注于業(yè)務(wù)邏輯,讓框架負(fù)責(zé)基礎(chǔ)設(shè)施。
-
基礎(chǔ)架構(gòu)、批處理執(zhí)行環(huán)境和批處理應(yīng)用程序之間的關(guān)注點(diǎn)清晰分離。
-
提供通用的核心執(zhí)行服務(wù),作為所有項(xiàng)目都可以實(shí)現(xiàn)的接口。
-
提供可“開箱即用”的核心執(zhí)行接口的簡單和默認(rèn)實(shí)現(xiàn)。
-
通過在所有層中利用spring框架,可以輕松配置、定制和擴(kuò)展服務(wù)。
-
所有現(xiàn)有的核心服務(wù)都應(yīng)該易于替換或擴(kuò)展,而不會對基礎(chǔ)架構(gòu)層造成任何影響。
-
提供一個(gè)簡單的部署模型,使用Maven構(gòu)建的架構(gòu)JAR與應(yīng)用程序完全分離。
Spring Batch的結(jié)構(gòu):
此分層體系結(jié)構(gòu)突出了三個(gè)主要的高級組件:應(yīng)用程序、核心和基礎(chǔ)架構(gòu)。該應(yīng)用程序包含開發(fā)人員使用SpringBatch編寫的所有批處理作業(yè)和自定義代碼。批處理核心包含啟動(dòng)和控制批處理作業(yè)所需的核心運(yùn)行時(shí)類。它包括JobLauncher、Job和Step的實(shí)現(xiàn)。應(yīng)用程序和核心都構(gòu)建在公共基礎(chǔ)架構(gòu)之上。此基礎(chǔ)結(jié)構(gòu)包含公共讀寫器和服務(wù)(如RetryTemplate),應(yīng)用程序開發(fā)人員(讀寫器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的庫)都使用這些服務(wù)。
下面介紹開發(fā)流程
本例完成 讀取文件內(nèi)容,經(jīng)過處理后,將數(shù)據(jù)保存到數(shù)據(jù)庫中
-
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>MySQL</groupId>
<artifactId>mysql-connector-JAVA</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.0.7.Final</version>
</dependency>
-
應(yīng)用配置文件
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8
username: root
password: *******
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
autoCommit: true
idleTimeout: 30000
poolName: MasterDatabookHikariCP
maxLifetime: 1800000
connectionTimeout: 30000
connectionTestQuery: SELECT 1
---
spring:
jpa:
generateDdl: false
hibernate:
ddlAuto: update
openInView: true
show-sql: true
---
spring:
batch:
job:
enabled: false #是否自動(dòng)執(zhí)行任務(wù)
initialize-schema: always #自動(dòng)為我們創(chuàng)建數(shù)據(jù)庫腳本
-
開啟批處理功能
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer{
}
-
任務(wù)啟動(dòng)器
接著上一步的配置類BatchConfig重寫對應(yīng)方法
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(createJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
-
任務(wù)存儲
接著上一步的配置類BatchConfig重寫對應(yīng)方法
@Resource
private PlatformTransactionManager transactionManager ;
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDatabaseType("mysql");
factory.setTransactionManager(transactionManager);
factory.setDataSource(dataSource);
factory.afterPropertiesSet();
return factory.getObject();
}
-
定義JOB
@Bean
public Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){
return builder.get("myJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.listener(jobExecutionListener)
.build();
}
-
定義ItemReader讀取器
@Bean
public ItemReader<Person> reader(){
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("cvs/persons.cvs"));
reader.setLineMApper(new DefaultLineMapper<Person>() {
// 代碼塊
{
setL.NETokenizer(new DelimitedLineTokenizer(",") {
{
setNames("id", "name");
}
}) ;
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class) ;
}
});
}
});
return reader;
}
-
定義ItemProcessor處理器
@Bean
public ItemProcessor<Person, Person2> processorPerson(){
return new ItemProcessor<Person, Person2>() {
@Override
public Person2 process(Person item) throws Exception {
Person2 p = new Person2() ;
p.setId(item.getId()) ;
p.setName(item.getName() + ", pk");
return p ;
}
} ;
}
-
定義ItemWriter寫數(shù)據(jù)
@Resource
private Validator<Person> validator ;
@Resource
private EntityManagerFactory entityManagerFactory ;
@Bean
public ItemWriter<Person2> writerPerson(){
JpAItemWriter<Person2> writer = null ;
JpaItemWriterBuilder<Person2> builder = new JpaItemWriterBuilder<>() ;
builder.entityManagerFactory(entityManagerFactory) ;
writer = builder.build() ;
return writer;
}
-
定義Step
@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){
return stepBuilderFactory
.get("myStep")
.<Person, Person>chunk(2) // Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作)
.reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2).listener(new MyReadListener())
.processor(processor)
.writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
.listener(new MyWriteListener())
.build();
}
-
定義相應(yīng)的監(jiān)聽器
public class MyReadListener implements ItemReadListener<Person> {
private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
@Override
public void beforeRead() {
}
@Override
public void afterRead(Person item) {
System.out.println("reader after: " + Thread.currentThread().getName()) ;
}
@Override
public void onReadError(Exception ex) {
logger.info("讀取數(shù)據(jù)錯(cuò)誤:{}", ex);
}
}
@Component
public class MyWriteListener implements ItemWriteListener<Person> {
private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
@Override
public void beforeWrite(List<? extends Person> items) {
}
@Override
public void afterWrite(List<? extends Person> items) {
System.out.println("writer after: " + Thread.currentThread().getName()) ;
}
@Override
public void onWriteError(Exception exception, List<? extends Person> items) {
try {
logger.info(format("%s%n", exception.getMessage()));
for (Person item : items) {
logger.info(format("Failed writing BlogInfo : %s", item.toString()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
person.cvs文件內(nèi)容
實(shí)體類:@Entity
@Table(name = "t_person")
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id ;
private String name ;
}
啟動(dòng)任務(wù)執(zhí)行
@RestController
@RequestMapping("/demo")
public class DemoController {
@Resource
@Qualifier("myJob")
private Job job ;
@Resource
private JobLauncher launcher ;
@GetMapping("/index")
public Object index() {
JobParameters jobParameters = new JobParametersBuilder().toJobParameters() ;
try {
launcher.run(job, jobParameters) ;
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
return "success" ;
}
}
啟動(dòng)服務(wù),自動(dòng)為我們創(chuàng)建了表
執(zhí)行任務(wù)
查看表情況
完畢!!!