一、SpringBatch 介紹
Spring Batch 是一個輕量級、全面的批處理框架,旨在支持開發(fā)對企業(yè)系統(tǒng)的日常操作至關重要的健壯的批處理應用程序。Spring Batch 建立在人們期望的 Spring Framework 特性(生產力、基于 POJO 的開發(fā)方法和一般易用性)的基礎上,同時使開發(fā)人員可以在必要時輕松訪問和使用更高級的企業(yè)服務。
Spring Batch 不是一個調度框架。在商業(yè)和開源領域都有許多優(yōu)秀的企業(yè)調度程序(例如 Quartz、Tivoli、Control-M 等)。Spring Batch 旨在與調度程序結合使用,而不是替代調度程序。

二、業(yè)務場景
我們在業(yè)務開發(fā)中經(jīng)常遇到這種情況:

Spring Batch 支持以下業(yè)務場景:
定期提交批處理。
并發(fā)批處理:并行處理作業(yè)。
分階段的企業(yè)消息驅動處理。
大規(guī)模并行批處理。
失敗后手動或計劃重啟。
相關步驟的順序處理(擴展到工作流驅動的批次)。
部分處理:跳過記錄(例如,在回滾時)。
整批交易,適用于批量較小或已有存儲過程或腳本的情況。
三、基礎知識
3.1、整體架構

| 名稱 | 作用 |
|---|---|
| JobRepository | 為所有的原型(Job、JobInstance、Step)提供持久化的機制 |
| JobLauncher | JobLauncher表示一個簡單的接口,用于啟動一個Job給定的集合 JobParameters |
| Job | Job是封裝了整個批處理過程的實體 |
| Step | Step是一個域對象,它封裝了批處理作業(yè)的一個獨立的順序階段 |
3.2、核心接口
ItemReader: is an abstraction that represents the output of a Step, one batch or chunk of items at a time
ItemProcessor:an abstraction that represents the business processing of an item.
ItemWriter: is an abstraction that represents the output of a Step, one batch or chunk of items at a time.

大體即為 輸入→數(shù)據(jù)加工→輸出 ,一個Job定義多個Step及處理流程,一個Step通常涵蓋ItemReader、ItemProcessor、ItemWriter
四、基礎實操
4.0、引入 SpringBatch
pom 文件引入 springboot
?? org.springframework.boot ??spring-boot-starter-parent ??2.2.5.RELEASE ???
pom 文件引入 spring-batch 及相關依賴
???? ?????? ????org.springframework.boot ??????spring-boot-starter-batch ?????????? ????org.springframework.boot ??????spring-boot-starter-validation ?????????? ????mysql ??????mysql-connector-java ?????????? ??org.springframework.boot ??????spring-boot-starter-jdbc ????
mysql 創(chuàng)建依賴的庫表

sql 腳本的 jar 包路徑:.....maven epositoryorgspringframeworkatchspring-batch-core4.2.1.RELEASEspring-batch-core-4.2.1.RELEASE.jar!orgspringframeworkatchcoreschema-mysql.sql
啟動類標志@EnableBatchProcessing
@SpringBootApplication
@EnableBatchProcessing
public?class?SpringBatchStartApplication
{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(SpringBatchStartApplication.class,?args);
????}
}
FirstJobDemo
@Component
public?class?FirstJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?firstJob()?{
????????return?jobBuilderFactory.get("firstJob")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?{
????????return?stepBuilderFactory.get("step")
????????????????.tasklet((contribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟....");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
}
4.1、流程控制
A、多步驟任務
@Bean
public?Job?multiStepJob()?{
????return?jobBuilderFactory.get("multiStepJob2")
????????????.start(step1())
????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step2())
????????????.from(step2())
????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step3())
????????????.from(step3()).end()
????????????.build();
}
private?Step?step1()?{
????return?stepBuilderFactory.get("step1")
????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????return?RepeatStatus.FINISHED;
????????????}).build();
}
private?Step?step2()?{
????return?stepBuilderFactory.get("step2")
????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????return?RepeatStatus.FINISHED;
????????????}).build();
}
private?Step?step3()?{
????return?stepBuilderFactory.get("step3")
????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????return?RepeatStatus.FINISHED;
????????????}).build();
}
B、并行執(zhí)行
創(chuàng)建了兩個 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通過JobBuilderFactory的split方法,指定一個異步執(zhí)行器,將 flow1 和 flow2 異步執(zhí)行(也就是并行)
@Component
public?class?SplitJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?splitJob()?{
????????return?jobBuilderFactory.get("splitJob")
????????????????.start(flow1())
????????????????.split(new?SimpleAsyncTaskExecutor()).add(flow2())
????????????????.end()
????????????????.build();
????}
????private?Step?step1()?{
????????return?stepBuilderFactory.get("step1")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step2()?{
????????return?stepBuilderFactory.get("step2")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step3()?{
????????return?stepBuilderFactory.get("step3")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Flow?flow1()?{
????????return?new?FlowBuilder("flow1")
????????????????.start(step1())
????????????????.next(step2())
????????????????.build();
????}
????private?Flow?flow2()?{
????????return?new?FlowBuilder("flow2")
????????????????.start(step3())
????????????????.build();
????}
}
C、任務決策
決策器的作用就是可以指定程序在不同的情況下運行不同的任務流程,比如今天是周末,則讓任務執(zhí)行 step1 和 step2,如果是工作日,則之心 step1 和 step3。
@Component
public?class?MyDecider?implements?JobExecutionDecider?{
????@Override
????public?FlowExecutionStatus?decide(JobExecution?jobExecution,?StepExecution?stepExecution)?{
????????LocalDate?now?=?LocalDate.now();
????????DayOfWeek?dayOfWeek?=?now.getDayOfWeek();
????????if?(dayOfWeek?==?DayOfWeek.SATURDAY?||?dayOfWeek?==?DayOfWeek.SUNDAY)?{
????????????return?new?FlowExecutionStatus("weekend");
????????}?else?{
????????????return?new?FlowExecutionStatus("workingDay");
????????}
????}
}
@Bean
public?Job?deciderJob()?{
?return?jobBuilderFactory.get("deciderJob")
???.start(step1())
???.next(myDecider)
???.from(myDecider).on("weekend").to(step2())
???.from(myDecider).on("workingDay").to(step3())
???.from(step3()).on("*").to(step4())
???.end()
???.build();
}
private?Step?step1()?{
?return?stepBuilderFactory.get("step1")
???.tasklet((stepContribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟一操作。。。");
????return?RepeatStatus.FINISHED;
???}).build();
}
private?Step?step2()?{
?return?stepBuilderFactory.get("step2")
???.tasklet((stepContribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟二操作。。。");
????return?RepeatStatus.FINISHED;
???}).build();
}
private?Step?step3()?{
?return?stepBuilderFactory.get("step3")
???.tasklet((stepContribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟三操作。。。");
????return?RepeatStatus.FINISHED;
???}).build();
}
private?Step?step4()?{
?return?stepBuilderFactory.get("step4")
???.tasklet((stepContribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟四操作。。。");
????return?RepeatStatus.FINISHED;
???}).build();
}
D、任務嵌套
任務 Job 除了可以由 Step 或者 Flow 構成外,我們還可以將多個任務 Job 轉換為特殊的 Step,然后再賦給另一個任務 Job,這就是任務的嵌套。
@Component
public?class?NestedJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Autowired
????private?JobLauncher?jobLauncher;
????@Autowired
????private?JobRepository?jobRepository;
????@Autowired
????private?PlatformTransactionManager?platformTransactionManager;
????//?父任務
????@Bean
????public?Job?parentJob()?{
????????return?jobBuilderFactory.get("parentJob")
????????????????.start(childJobOneStep())
????????????????.next(childJobTwoStep())
????????????????.build();
????}
????//?將任務轉換為特殊的步驟
????private?Step?childJobOneStep()?{
????????return?new?JobStepBuilder(new?StepBuilder("childJobOneStep"))
????????????????.job(childJobOne())
????????????????.launcher(jobLauncher)
????????????????.repository(jobRepository)
????????????????.transactionManager(platformTransactionManager)
????????????????.build();
????}
????//?將任務轉換為特殊的步驟
????private?Step?childJobTwoStep()?{
????????return?new?JobStepBuilder(new?StepBuilder("childJobTwoStep"))
????????????????.job(childJobTwo())
????????????????.launcher(jobLauncher)
????????????????.repository(jobRepository)
????????????????.transactionManager(platformTransactionManager)
????????????????.build();
????}
????//?子任務一
????private?Job?childJobOne()?{
????????return?jobBuilderFactory.get("childJobOne")
????????????????.start(
????????????????????stepBuilderFactory.get("childJobOneStep")
????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????????????????System.out.println("子任務一執(zhí)行步驟。。。");
????????????????????????????????return?RepeatStatus.FINISHED;
????????????????????????????}).build()
????????????????).build();
????}
????//?子任務二
????private?Job?childJobTwo()?{
????????return?jobBuilderFactory.get("childJobTwo")
????????????????.start(
????????????????????stepBuilderFactory.get("childJobTwoStep")
????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????????????????System.out.println("子任務二執(zhí)行步驟。。。");
????????????????????????????????return?RepeatStatus.FINISHED;
????????????????????????????}).build()
????????????????).build();
????}
}
4.2、讀取數(shù)據(jù)
定義 Model TestData,下面同一
@Data
public?class?TestData?{
????private?int?id;
????private?String?field1;
????private?String?field2;
????private?String?field3;
}
讀取數(shù)據(jù)包含:文本數(shù)據(jù)讀取、數(shù)據(jù)庫數(shù)據(jù)讀取、XML 數(shù)據(jù)讀取、JSON 數(shù)據(jù)讀取等,具體自己查資料。
文本數(shù)據(jù)讀取 Demo
@Component
public?class?FileItemReaderDemo?{
????//?任務創(chuàng)建工廠
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????//?步驟創(chuàng)建工廠
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?fileItemReaderJob()?{
????????return?jobBuilderFactory.get("fileItemReaderJob2")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?{
????????return?stepBuilderFactory.get("step")
????????????????.chunk(2)
????????????????.reader(fileItemReader())
????????????????.writer(list?->?list.forEach(System.out::println))
????????????????.build();
????}
????private?ItemReader?fileItemReader()?{
????????FlatFileItemReader?reader?=?new?FlatFileItemReader<>();
????????reader.setResource(new?ClassPathResource("reader/file"));?//?設置文件資源地址
????????reader.setLinesToSkip(1);?//?忽略第一行
????????//?AbstractLineTokenizer的三個實現(xiàn)類之一,以固定分隔符處理行數(shù)據(jù)讀取,
????????//?使用默認構造器的時候,使用逗號作為分隔符,也可以通過有參構造器來指定分隔符
????????DelimitedLineTokenizer?tokenizer?=?new?DelimitedLineTokenizer();
????????//?設置屬性名,類似于表頭
????????tokenizer.setNames("id",?"field1",?"field2",?"field3");
????????//?將每行數(shù)據(jù)轉換為TestData對象
????????DefaultLineMapper?mapper?=?new?DefaultLineMapper<>();
????????//?設置LineTokenizer
????????mapper.setLineTokenizer(tokenizer);
????????//?設置映射方式,即讀取到的文本怎么轉換為對應的POJO
????????mapper.setFieldSetMapper(fieldSet?->?{
????????????TestData?data?=?new?TestData();
????????????data.setId(fieldSet.readInt("id"));
????????????data.setField1(fieldSet.readString("field1"));
????????????data.setField2(fieldSet.readString("field2"));
????????????data.setField3(fieldSet.readString("field3"));
????????????return?data;
????????});
????????reader.setLineMapper(mapper);
????????return?reader;
????}
}
4.3、輸出數(shù)據(jù)
輸出數(shù)據(jù)也包含:文本數(shù)據(jù)讀取、數(shù)據(jù)庫數(shù)據(jù)讀取、XML 數(shù)據(jù)讀取、JSON 數(shù)據(jù)讀取等
Component
public?class?FileItemWriterDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Resource(name?=?"writerSimpleReader")
????private?ListItemReader?writerSimpleReader;
????@Bean
????public?Job?fileItemWriterJob()?throws?Exception?{
????????return?jobBuilderFactory.get("fileItemWriterJob")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?throws?Exception?{
????????return?stepBuilderFactory.get("step")
????????????????.chunk(2)
????????????????.reader(writerSimpleReader)
????????????????.writer(fileItemWriter())
????????????????.build();
????}
????private?FlatFileItemWriter?fileItemWriter()?throws?Exception?{
????????FlatFileItemWriter?writer?=?new?FlatFileItemWriter<>();
????????FileSystemResource?file?=?new?FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file");
????????Path?path?=?Paths.get(file.getPath());
????????if?(!Files.exists(path))?{
????????????Files.createFile(path);
????????}
????????//?設置輸出文件路徑
????????writer.setResource(file);
????????//?把讀到的每個TestData對象轉換為JSON字符串
????????LineAggregator?aggregator?=?item?->?{
????????????try?{
????????????????ObjectMapper?mapper?=?new?ObjectMapper();
????????????????return?mapper.writeValueAsString(item);
????????????}?catch?(JsonProcessingException?e)?{
????????????????e.printStackTrace();
????????????}
????????????return?"";
????????};
????????writer.setLineAggregator(aggregator);
????????writer.afterPropertiesSet();
????????return?writer;
????}
}
4.5、處理數(shù)據(jù)
@Component
public?class?ValidatingItemProcessorDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Resource(name?=?"processorSimpleReader")
????private?ListItemReader?processorSimpleReader;
????@Bean
????public?Job?validatingItemProcessorJob()?throws?Exception?{
????????return?jobBuilderFactory.get("validatingItemProcessorJob3")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?throws?Exception?{
????????return?stepBuilderFactory.get("step")
????????????????.chunk(2)
????????????????.reader(processorSimpleReader)
????????????????.processor(beanValidatingItemProcessor())
????????????????.writer(list?->?list.forEach(System.out::println))
????????????????.build();
????}
//????private?ValidatingItemProcessor?validatingItemProcessor()?{
//????????ValidatingItemProcessor?processor?=?new?ValidatingItemProcessor<>();
//????????processor.setValidator(value?->?{
//????????????//?對每一條數(shù)據(jù)進行校驗
//????????????if?("".equals(value.getField3()))?{
//????????????????//?如果field3的值為空串,則拋異常
//????????????????throw?new?ValidationException("field3的值不合法");
//????????????}
//????????});
//????????return?processor;
//????}
????private?BeanValidatingItemProcessor?beanValidatingItemProcessor()?throws?Exception?{
????????BeanValidatingItemProcessor?beanValidatingItemProcessor?=?new?BeanValidatingItemProcessor<>();
????????//?開啟過濾,不符合規(guī)則的數(shù)據(jù)被過濾掉;
//????????beanValidatingItemProcessor.setFilter(true);
????????beanValidatingItemProcessor.afterPropertiesSet();
????????return?beanValidatingItemProcessor;
????}
}
4.6、任務調度
可以配合 quartz 或者 xxljob 實現(xiàn)定時任務執(zhí)行
@RestController @RequestMapping("job") public?class?JobController?{ ????@Autowired ????private?Job?job; ????@Autowired ????private?JobLauncher?jobLauncher; ????@GetMapping("launcher/{message}") ????public?String?launcher(@PathVariable?String?message)?throws?Exception?{ ????????JobParameters?parameters?=?new?JobParametersBuilder() ????????????????.addString("message",?message) ????????????????.toJobParameters(); ????????//?將參數(shù)傳遞給任務 ????????jobLauncher.run(job,?parameters); ????????return?"success"; ????} } 編輯:黃飛
電子發(fā)燒友App






















評論