Configure retry logic based on Spring Batch

  • 2021-11-10 09:36:36
  • OfStack

Directory 1. Application sample batch application read csv file processing class as follows. The final output result is 2. Add retry function to processing. Therefore, we configure batch job to retry 3 times in case of failure. 3. Test retry function successfully called job successfully for the third time. 4. Summary

Spring Batch encounters an error during processing and job fails by default. In order to improve the robustness of the application, we need to handle the failure caused by temporary exceptions. In this article, we discuss how to configure the retry logic of Spring and Batch.

1. Application example

Batch application reading csv file


sammy, 1234, 31/10/2015, 10000
john, 9999, 3/12/2015, 12321

Then, by calling the rest interface to process each record, the user's age and zip code attributes are obtained. In order to correctly output the date, the @ XmlJavaTypeAdapter (LocalDateTimeAdapter. class) annotation can be added to the attributes:


@XmlRootElement(name =  " transactionRecord " )
@Data
public class Transaction {
private String username;
private int userId;
private int age;
private String postCode;
private LocalDateTime transactionDate;
private double amount;
}

The processing classes are as follows


public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RetryItemProcessor.class);
    @Autowired
    private CloseableHttpClient closeableHttpClient;
    @Override
    public Transaction process(Transaction transaction) throws IOException, JSONException {
        LOGGER.info("Attempting to process user with id={}", transaction.getUserId());
        HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
        //parse user's age and postCode from response and update transaction
        String result = EntityUtils.toString(response.getEntity());
        JSONObject userObject = new JSONObject(result);
        transaction.setAge(Integer.parseInt(userObject.getString("age")));
        transaction.setPostCode(userObject.getString("postCode"));
        return transaction;
    }
    private HttpResponse fetchMoreUserDetails(int id) throws IOException {
        final HttpGet request = new HttpGet("http://www.testapi.com:81/user/" + id);
        return closeableHttpClient.execute(request);
    }
}

Here, of course, you can also use RestTemplate to call, calling services only for testing, and readers can build test interfaces.

The final output is


<transactionRecord>
    <transactionRecord>
        <amount>10000.0</amount>
        <transactionDate>2015-10-31 00:00:00</transactionDate>
        <userId>1234</userId>
        <username>sammy</username>
        <age>10</age>
        <postCode>430222</postCode>
    </transactionRecord>
    ...
</transactionRecord>

2. Add retry to the process

If the connection to the rest interface times out due to network instability, the batch will fail. However, this kind of error is not unrecoverable, and can be tried by retrying several times.

So we configured the batch job to retry 3 times in case of failure


@Configuration
@EnableBatchProcessing
public class SpringBatchRetryConfig {
    private static final String[] tokens = { "username", "userid", "transactiondate", "amount" };
    private static final int TWO_SECONDS = 2000;
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Value("input/recordRetry.csv")
    private Resource inputCsv;
    @Value("file:xml/retryOutput.xml")
    private Resource outputXml;
    public ItemReader<Transaction> itemReader(Resource inputData) throws ParseException {
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(tokens);
        DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
        FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();
        reader.setResource(inputData);
        reader.setLinesToSkip(1);
        reader.setLineMapper(lineMapper);
        return reader;
    }
    @Bean
    public CloseableHttpClient closeableHttpClient() {
        final RequestConfig config = RequestConfig.custom()
          .setConnectTimeout(TWO_SECONDS)
          .build();
        return HttpClientBuilder.create().setDefaultRequestConfig(config).build();
    }
    @Bean
    public ItemProcessor<Transaction, Transaction> retryItemProcessor() {
        return new RetryItemProcessor();
    }
    @Bean
    public ItemWriter<Transaction> itemWriter(Marshaller marshaller) {
        StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
        itemWriter.setMarshaller(marshaller);
        itemWriter.setRootTagName("transactionRecord");
        itemWriter.setResource(outputXml);
        return itemWriter;
    }
    @Bean
    public Marshaller marshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(Transaction.class);
        return marshaller;
    }
    @Bean
    public Step retryStep(@Qualifier("retryItemProcessor") ItemProcessor<Transaction, Transaction> processor,
      ItemWriter<Transaction> writer) throws ParseException {
        return stepBuilderFactory.get("retryStep")
          .<Transaction, Transaction>chunk(10)
          .reader(itemReader(inputCsv))
          .processor(processor)
          .writer(writer)
          .faultTolerant()
          .retryLimit(3)
          .retry(ConnectTimeoutException.class)
          .retry(DeadlockLoserDataAccessException.class)
          .build();
    }
    @Bean(name = "retryBatchJob")
    public Job retryJob(@Qualifier("retryStep") Step retryStep) {
        return jobBuilderFactory
          .get("retryBatchJob")
          .start(retryStep)
          .build();
    }

Here the faultTolerant () method is called to enable the retry function and set the number of retries and the corresponding exception.

3. Test the retry feature

We tested the scenario and expected the interface to return the age and zip code within a certain time. The first two calls to API throw an exception ConnectTimeoutException

The third successful call


@RunWith(SpringRunner.class)
@SpringBatchTest
@EnableAutoConfiguration
@ContextConfiguration(classes = { SpringBatchRetryConfig.class })
public class SpringBatchRetryIntegrationTest {
    private static final String TEST_OUTPUT = "xml/retryOutput.xml";
    private static final String EXPECTED_OUTPUT = "src/test/resources/output/batchRetry/retryOutput.xml";
    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;
    @MockBean
    private CloseableHttpClient closeableHttpClient;
    @Mock
    private CloseableHttpResponse httpResponse;
    @Test
    public void whenEndpointAlwaysFail_thenJobFails() throws Exception {
        when(closeableHttpClient.execute(any()))
          .thenThrow(new ConnectTimeoutException("Endpoint is down"));
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(defaultJobParameters());
        JobInstance actualJobInstance = jobExecution.getJobInstance();
        ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
        assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
        assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
        assertThat(actualJobExitStatus.getExitDescription(), containsString("org.apache.http.conn.ConnectTimeoutException"));
    }
    @Test
    public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {
        FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
        FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);
        // The first two calls failed, the first 3 Continue execution for the second time 
        when(httpResponse.getEntity())
          .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }"));
        when(closeableHttpClient.execute(any()))
          .thenThrow(new ConnectTimeoutException("Timeout count 1"))
          .thenThrow(new ConnectTimeoutException("Timeout count 2"))
          .thenReturn(httpResponse);
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(defaultJobParameters());
        JobInstance actualJobInstance = jobExecution.getJobInstance();
        ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
        assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
        assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
        AssertFile.assertFileEquals(expectedResult, actualResult);
    }
    private JobParameters defaultJobParameters() {
        JobParametersBuilder paramsBuilder = new JobParametersBuilder();
        paramsBuilder.addString("jobID", String.valueOf(System.currentTimeMillis()));
        return paramsBuilder.toJobParameters();
    }
}

job executed successfully

From the log, you can see that there were two failures, and finally the call succeeded.

19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms

At the same time, another test is defined, which retries many times and fails, throwing an exception ConnectTimeoutException.

4. Summary


Related articles: