Encapsulation of springbatch and Detailed Explanation of Using Examples

  • 2021-07-22 09:48:05
  • OfStack

Spring Batch official website introduction:

A lightweight, comprehensive batch framework designed to enable the development of batch applications vital for the daily operations of enterprise systems. (A lightweight, comprehensive batch framework for developing powerful day-to-day enterprise batch applications.)

springbatch

Mainly realize batch data processing, I encapsulated batch, proposed jobBase type, specific job needs to achieve it. Spring Batch not only provides reading and writing interface, rich task processing mode, flexible transaction management and concurrent processing, At the same time, it also supports the characteristics of logging, monitoring, task restart and skipping, which greatly simplifies the development of batch application, frees developers from the complex task configuration management process, and enables them to pay more attention to the core business processing process.

Several components

•job
•step
•read
•write
•listener
•process
•validator

JobBase defines several common methods


 /**
 * springBatch Adj. job Basic class .
 */
 public abstract class JobBase<T> {
  /**
  *  Batch .
  */
  protected int chunkCount = 5000;
  /**
  *  Listener .
  */
  private JobExecutionListener jobExecutionListener;
  /**
  *  Processor .
  */
  private ValidatingItemProcessor<T> validatingItemProcessor;
  /**
  * job Name .
  */
  private String jobName;
  /**
  *  Checker .
  */
  private Validator<T> validator;
  @Autowired
  private JobBuilderFactory job;
  @Autowired
  private StepBuilderFactory step;
  /**
  *  Initialization .
  *
  * @param jobName         job Name 
  * @param jobExecutionListener   Listener 
  * @param validatingItemProcessor  Processor 
  * @param validator         Inspection 
  */
  public JobBase(String jobName,
         JobExecutionListener jobExecutionListener,
         ValidatingItemProcessor<T> validatingItemProcessor,
         Validator<T> validator) {
   this.jobName = jobName;
   this.jobExecutionListener = jobExecutionListener;
   this.validatingItemProcessor = validatingItemProcessor;
   this.validator = validator;
  }
  /**
  * job Initialization and startup .
  */
  public Job getJob() throws Exception {
   return job.get(jobName).incrementer(new RunIdIncrementer())
     .start(syncStep())
     .listener(jobExecutionListener)
     .build();
  }
  /**
  *  Execute steps .
  *
  * @return
  */
  public Step syncStep() throws Exception {
   return step.get("step1")
     .<T, T>chunk(chunkCount)
     .reader(reader())
     .processor(processor())
     .writer(writer())
     .build();
  }
  /**
  *  Single piece of processing data .
  *
  * @return
  */
  public ItemProcessor<T, T> processor() {
   validatingItemProcessor.setValidator(processorValidator());
   return validatingItemProcessor;
  }
  /**
  *  Check data .
  *
  * @return
  */
  @Bean
  public Validator<T> processorValidator() {
   return validator;
  }
  /**
  *  Read data in batches .
  *
  * @return
  * @throws Exception
  */
  public abstract ItemReader<T> reader() throws Exception;
  /**
  *  Batch write data .
  *
  * @return
  */
  @Bean
  public abstract ItemWriter<T> writer();
 }

It mainly stipulates the execution strategy of common methods, while the specific job name, reading and writing still need the specific JOB to realize.

Specific Job implementation


 @Configuration
 @EnableBatchProcessing
 public class SyncPersonJob extends JobBase<Person> {
  @Autowired
  private DataSource dataSource;
  @Autowired
  @Qualifier("primaryJdbcTemplate")
  private JdbcTemplate jdbcTemplate;
  /**
  *  Initialization, rules job Name and Monitor .
  */
  public SyncPersonJob() {
   super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>());
  }
  @Override
  public ItemReader<Person> reader() throws Exception {
   StringBuffer sb = new StringBuffer();
   sb.append("select * from person");
   String sql = sb.toString();
   JdbcCursorItemReader<Person> jdbcCursorItemReader =
     new JdbcCursorItemReader<>();
   jdbcCursorItemReader.setSql(sql);
   jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
   jdbcCursorItemReader.setDataSource(dataSource);
   return jdbcCursorItemReader;
  }
  @Override
  @Bean("personJobWriter")
  public ItemWriter<Person> writer() {
   JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
   writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
   String sql = "insert into person_export " + "(id,name,age,nation,address) "
     + "values(:id, :name, :age, :nation,:address)";
   writer.setSql(sql);
   writer.setDataSource(dataSource);
   return writer;
  }
 }

Write operation needs to define its own declaration of bean

Note that you need to name write for each job, otherwise write will be scrambled when there are multiple job


 /**
  *  Batch write data .
  *
  * @return
  */
 @Override
 @Bean("personVerson2JobWriter")
 public ItemWriter<Person> writer() {
  
 }

Add 1 api, triggered manually


 @Autowired
 SyncPersonJob syncPersonJob;
 @Autowired
 JobLauncher jobLauncher;
 void exec(Job job) throws Exception {
  JobParameters jobParameters = new JobParametersBuilder()
    .addLong("time", System.currentTimeMillis())
    .toJobParameters();
  jobLauncher.run(job, jobParameters);
 }
 @RequestMapping("/run1")
 public String run1() throws Exception {
  exec(syncPersonJob.getJob());
  return "personJob success";
 }

Summarize

The above is the site to introduce the springbatch packaging and use of detailed examples, I hope to help you, if you have any questions welcome to leave me a message, this site will reply to you in time!


Related articles: