Springbatch入门示例

1   场景说明

网站建设哪家好,找成都创新互联!专注于网页设计、网站建设、微信开发、小程序设计、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了海盐免费建站欢迎大家使用!

读取CVS文件,经过处理后,保存到数据库。

 

Spring batch入门示例

2   项目结构

应用程序

启动主程序

DemoApplication.java

读取文件(输入文件)

UserItemReader.java

处理数据

UserItemProcess.java

输出文件

UserItemWriter.java

调度批作业

定时处理配置

QuartzConfiguration.java

定时调度

QuartzJobLauncher.java

辅助文件

数据文件

User.txt

对象实体(传递对象)

User.java

Meaven配置文件

Pom.xml

Spring batch入门示例

2.1  Pom.xml

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   http://maven.apache.org/xsd/maven-4.0.0.xsd">

    4.0.0

 

    com.zy

    SpringBatchDemo1

    0.0.1-SNAPSHOT

    jar

 

    SpringBatchDemo1

    Demo   project for Spring Boot

 

   

       org.springframework.boot

       spring-boot-starter-parent

       1.5.10.RELEASE

      

   

 

   

       UTF-8

        UTF-8

       1.8

   

 

   

      

           org.springframework

           spring-context-support

      

      

           org.springframework.boot

           spring-boot-starter-batch

      

      

           org.springframework

           spring-oxm

      

      

           org.projectlombok

           lombok

      

      

      

           MySQL

           mysql-connector-java

           runtime

      

      

           org.springframework.boot

           spring-boot-starter-test

           test

      

      

           org.springframework.batch

           spring-batch-test

           test

      

      

           org.projectlombok

           lombok

      

      

           org.quartz-scheduler

           quartz

           2.3.0

      

      

           com.h3database

           h3

           runtime

      

   

 

   

      

          

              org.springframework.boot

              spring-boot-maven-plugin

          

      

   

 

 

2.2  User.java

package com.zy.model;

 

public class User {

       private   String id;

       private   String name;

       private   String age;

      

       public   User(String id, String name, String age) {

              this.id   = id;

              this.name   = name;

              this.age   = age;

       }

 

       public   String getId() {

              return   id;

       }

 

       public   void setId(String id) {

              this.id   = id;

       }

 

       public   String getName() {

              return   name;

       }

 

       public   void setName(String name) {

              this.name   = name;

       }

 

       public   String getAge() {

              return   age;

       }

 

       public   void setAge(String age) {

              this.age   = age;

       }

 

       @Override

       public   String toString() {

              return   "User [id=" + id + ", name=" + name + ", age="   + age + "]";

       }

      

}

2.3  UserItemReader.java

package com.zy.reader;

 

import   org.springframework.batch.item.file.FlatFileItemReader;

import   org.springframework.batch.item.file.LineMapper;

import   org.springframework.batch.item.file.mapping.DefaultLineMapper;

import org.springframework.batch.item.file.mapping.FieldSetMapper;

import   org.springframework.batch.item.file.transform.DelimitedLineTokenizer;

import   org.springframework.batch.item.file.transform.FieldSet;

import   org.springframework.batch.item.file.transform.LineTokenizer;

import   org.springframework.core.io.ClassPathResource;

import   org.springframework.validation.BindException;

 

import com.zy.model.User;

//从user.txt文件中读取信息到User

public class UserItemReader extends   FlatFileItemReader {

       public   UserItemReader(){

              createReader();

       }

      

       private   void createReader(){

              this.setResource(new   ClassPathResource("data/User.txt"));

              this.setLinesToSkip(1);

              this.setLineMapper(userLineMapper());

       }

      

       private   LineMapper userLineMapper(){

              DefaultLineMapper   lineMapper = new DefaultLineMapper<>();

              lineMapper.setLineTokenizer(userLineTokenizer());

              lineMapper.setFieldSetMapper(new   UserFieldStepMapper());

              lineMapper.afterPropertiesSet();  

              return   lineMapper;

       }

      

      private LineTokenizer userLineTokenizer(){

          DelimitedLineTokenizer   tokenizer = new DelimitedLineTokenizer();

          tokenizer.setNames(new String[]{"ID", "NAME",   "AGE"});

          return tokenizer;

      }

     

      private static class UserFieldStepMapper implements   FieldSetMapper{

              @Override

              public   User mapFieldSet(FieldSet fieldSet) throws BindException {

            return new   User(fieldSet.readString("ID"),

                      fieldSet.readString("NAME"),

                      fieldSet.readString("AGE"));

              }

 

      }

 

     

}

2.4  User.txt

ID,NAME,AGE

1,zy,28

2,tom,20

3,terry,30

4,lerry,18

5,bob,25

6,linda,27

7,marry,39

8,long,22

9,kin,33

10,王五,40

 

2.5  UserItemProcessor.java

package com.zy.processor;

import   org.springframework.batch.item.ItemProcessor;

import com.zy.model.User;

 

public class UserItemProcessor implements   ItemProcessor {

 

       @Override

       public   User process(User item) throws Exception {

              if   (Integer.parseInt(item.getAge()) > 20) {

                

                     return   item;

              }

              return   null;

       }

 

}

 

2.6  UserItemWriter.java

package com.zy.writer;

import java.util.List;

import   org.springframework.batch.item.ItemWriter;

import com.zy.model.User;

 

public class UserItemWriter implements   ItemWriter {

 

       @Override

       public   void write(List items) throws Exception {

              for(User   user : items){

                     System.out.println(user);

              }

       }

 

}

2.7  QuartzJobLauncher

package com.zy.QuartzConfiguration;

 

import java.text.SimpleDateFormat;

import java.util.Date;

import org.quartz.JobDataMap;

import org.quartz.JobDetail;

import org.quartz.JobExecutionContext;

import org.quartz.JobExecutionException;

import org.quartz.JobKey;

import   org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecution;

import   org.springframework.batch.core.JobParameters;

import   org.springframework.batch.core.configuration.JobLocator;

import   org.springframework.batch.core.launch.JobLauncher;

import   org.springframework.scheduling.quartz.QuartzJobBean;

 

public class QuartzJobLauncher extends   QuartzJobBean {

       @Override

       protected   void executeInternal(JobExecutionContext context) throws JobExecutionException   {

             

              JobDetail   jobDetail = context.getJobDetail();

              JobDataMap   jobDataMap = jobDetail.getJobDataMap();

              String   jobName = jobDataMap.getString("jobName");

              JobLauncher   jobLauncher = (JobLauncher) jobDataMap.get("jobLauncher");

              JobLocator   jobLocator = (JobLocator) jobDataMap.get("jobLocator");

              System.out.println("jobName   : " + jobName);

              System.out.println("jobLauncher   : " + jobLauncher);

              System.out.println("jobLocator   : " + jobLocator);

              JobKey   key = context.getJobDetail().getKey();

              System.out.println(key.getName()   + " : " + key.getGroup());

              SimpleDateFormat   sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

              System.out.println("Current   time : " + sf.format(new Date()));

             

              try   {

                     Job   job = jobLocator.getJob(jobName);

                     JobExecution   jobExecution = jobLauncher.run(job, new JobParameters());

              }   catch (Exception e) {

                     e.printStackTrace();

              }

             

       }

 

}

 

2.8  QuartzConfiguration

package com.zy.QuartzConfiguration;

 

import java.util.HashMap;

import java.util.Map;

 

import   org.springframework.batch.core.configuration.JobLocator;

import   org.springframework.batch.core.configuration.JobRegistry;

import   org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;

import   org.springframework.batch.core.launch.JobLauncher;

import   org.springframework.beans.factory.annotation.Autowired;

import   org.springframework.context.annotation.Bean;

import   org.springframework.context.annotation.Configuration;

import   org.springframework.scheduling.quartz.CronTriggerFactoryBean;

import   org.springframework.scheduling.quartz.JobDetailFactoryBean;

import   org.springframework.scheduling.quartz.SchedulerFactoryBean;

 

@Configuration

public class QuartzConfiguration {

      

       //自动注入进来的是SimpleJobLauncher

       @Autowired

       private   JobLauncher jobLauncher;

      

       @Autowired

       private   JobLocator jobLocator;

      

       /*用来注册job*/

       /*JobRegistry会自动注入进来*/

       @Bean

       public   JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry   jobRegistry){

              JobRegistryBeanPostProcessor   jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();

              jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);

              return   jobRegistryBeanPostProcessor;

       }

      

       @Bean

       public   JobDetailFactoryBean jobDetailFactoryBean(){

              JobDetailFactoryBean   jobFactory = new JobDetailFactoryBean();

              jobFactory.setJobClass(QuartzJobLauncher.class);

              jobFactory.setGroup("my_group");

              jobFactory.setName("my_job");

              Map map = new HashMap<>();

              map.put("jobName",   "zyJob");

              map.put("jobLauncher",   jobLauncher);

              map.put("jobLocator",   jobLocator);

              jobFactory.setJobDataAsMap(map);

              return   jobFactory;

       }

      

       @Bean

       public   CronTriggerFactoryBean cronTriggerFactoryBean(){

              CronTriggerFactoryBean   cTrigger = new CronTriggerFactoryBean();

              System.out.println("-------   : " + jobDetailFactoryBean().getObject());

              cTrigger.setJobDetail(jobDetailFactoryBean().getObject());

              cTrigger.setStartDelay(3000);

              cTrigger.setName("my_trigger");

              cTrigger.setGroup("trigger_group");

              cTrigger.setCronExpression("0/3   * * * * ? "); //每间隔3s触发一次Job任务

              return   cTrigger;

       }

      

       @Bean

       public   SchedulerFactoryBean schedulerFactoryBean(){

              SchedulerFactoryBean   schedulerFactor = new SchedulerFactoryBean();

              schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject());

              return   schedulerFactor;

       }

 

}

 

 

2.9  BatchConfiguration

package com.zy.config;

import   org.springframework.batch.core.Job;

import   org.springframework.batch.core.Step;

import   org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

import   org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import   org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import   org.springframework.beans.factory.annotation.Autowired;

import   org.springframework.context.annotation.Bean;

import   org.springframework.context.annotation.Configuration;

import   org.springframework.context.annotation.Import;

import com.zy.QuartzConfiguration.QuartzConfiguration;

import com.zy.model.User;

import   com.zy.processor.UserItemProcessor;

import com.zy.reader.UserItemReader;

import com.zy.writer.UserItemWriter;

 

@Configuration

@EnableBatchProcessing

//@Import({QuartzConfiguration.class})

public class BatchConfiguration {

      

       @Autowired

       public   JobBuilderFactory jobBuilderFactory;

       @Autowired

       public   StepBuilderFactory stepBuilderFactory;

      

      

       /*创建job*/

       @Bean

       public   Job jobMethod(){

              return   jobBuilderFactory.get("zyJob")

                            .start(stepMethod())

                            .build();

       }

      

       /*创建step*/

       @Bean

       public   Step stepMethod(){

              return   stepBuilderFactory.get("myStep1")

                            .chunk(3)

                            .reader(new   UserItemReader())

                            .processor(new   UserItemProcessor())

                            .writer(new   UserItemWriter())

                            .allowStartIfComplete(true)

                            .build();

       }

      

 

}

 

3   执行Job输出结果

2019-04-30 21:31:48.049  INFO 9344 --- [ryBean_Worker-5]   o.s.b.c.l.support.SimpleJobLauncher        : Job: [SimpleJob: [name=zyJob]] completed with the following   parameters: [{}] and the following status: [COMPLETED]

jobName : zyJob

jobLauncher :   org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d

jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5

my_job : my_group

Current time : 2019-04-30 21:31:51

2019-04-30 21:31:51.012  INFO 9344 --- [ryBean_Worker-6]   o.s.b.c.l.support.SimpleJobLauncher        : Job: [SimpleJob: [name=zyJob]] launched with the following   parameters: [{}]

2019-04-30 21:31:51.028  INFO 9344 --- [ryBean_Worker-6]   o.s.batch.core.job.SimpleStepHandler       : Executing step: [myStep1]

User [id=1, name=zy, age=28]

User [id=3, name=terry, age=30]

User [id=5, name=bob, age=25]

User [id=6, name=linda, age=27]

User [id=7, name=marry, age=39]

User [id=8, name=long, age=22]

User [id=9, name=kin, age=33]

User [id=10, name=ww, age=40]

 

4   概念总结


Job Repository

作业仓库,负责Job,Step执行过程中的状态保存。


Job Launcher

作业调度器,提供执行Job的入口


Job

作业,多个Step组成,封装整个批处理操作。


Step

作业步,Job的一个执行环节,由多个或者一个Step组装成Job


Tasklet

Step中具体执行的逻辑的操作,可以重复执行,可以具体的设置同步,异步操作。


Chunk

给定数量的Item集合,可以定义对Chunk的读操作,处理操作,写操作,提交间隔。


Item

一条数据记录。


ItemReader

从数据源(文件系统,数据库,队列等)读取Item


ItemProcessor

在写入数据源之前,对数据进行处理(如:数据清洗,转换,过滤,数据校验等)。


ItemWriter

将Item批量写入数据源(文件系统,数据库,队列等)。

5   Spring Batch结构

Spring Batch的一个基本层级结构。

首先,Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情。

一个Job包含很多Step,step就是每个job要执行的单个步骤。

如下图所示,Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于可以重复利用的东西。

然后是Chunk,chunk就是数据块,你需要定义多大的数据量是一个chunk。

Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。

Spring batch入门示例


文章题目:Springbatch入门示例
文章出自:http://myzitong.com/article/gdjphh.html