Filtering items using ItemProcessor

In Spring Batch, items are read from the source using reader and written to destination using a writer.

Sometimes, we need to add business logic between the reader and writer for the following purpose
1) Filtering an item
2) Converting an item read from one format to another before written to the destination
3) Modify the item read before writing to the destination

This business logic is added between the reader and writer using ItemProcessor interface.

In the below example, we create a class implementing ItemProcessor interface, in the code we will transfer employees record from one file to another file and before writing
we filter the record in such way that only even number employees are written to the file.

Below is the code for ItemProcessor

Implementation of ItemProcessor


1  package package5;
2  
3  import org.springframework.batch.item.ItemProcessor;
4  
5  public class EmployeeFilterItemProcessor implements ItemProcessor {
6   int i = 0;
7   
8   @Override
9   public Employee process(Employee employee) throws Exception {
10      Employee employee1;
11      
12      if(i%2 == 0) {
13          employee1 = employee;
14      } else {
15          employee1 = null;
16      }
17      i = i + 1;
18      return employee1;
19  }
20 }

In the above code ItemProcessor, first argument type indicates the type of item coming from the reader and second argument type indicates the type of item
returning from the processor.

In the process method, if the employee record is at odd position we return null. When null is return, the item read is not forwarded to the writer.

Now I will explain how to integrate the processor between the reader and writer job step.

We define a bean of class EmployeeFilterItemProcessor as shown below

EmployeeFilterItemProcessor bean definition


1 <bean id="employeeFilterItemProcessor" class="package5.EmployeeFilterItemProcessor" />
2 
3 <batch:job id="importProductsJob">
4   <batch:step id="readWriteProducts">
5       <batch:tasklet>
6           <batch:chunk reader="reader" writer="writer" processor="employeeFilterItemProcessor" commit-interval="50"/>
7       </batch:tasklet>
8   </batch:step>
9 </batch:job>

In the above xml code, at line 1 we define a bean “employeeFilterItemProcessor” of type EmployeeFilterItemProcessor.

Then at line 6, we integrate the processor between the reader and writer using attribute “processor”.

Below is the Employee class

Employee class


package package5;

import java.math.BigDecimal;

public class Employee {
    private String id;
    private String name;
    private String status;
    private BigDecimal salary;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getStatus() {
        return status;
    }
    public void setStatus(String status) {
        this.status = status;
    }
    public BigDecimal getSalary() {
        return salary;
    }
    public void setSalary(BigDecimal salary) {
        this.salary = salary;
    }
}

Below is the main code

Main code


package package5;

import java.util.Date;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Example5 {
    public static void main(String[] args) throws JobParametersInvalidException, JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException {
        ApplicationContext context = new ClassPathXmlApplicationContext("package5\\job.xml");
        JobLauncher jobLauncher = (JobLauncher)context.getBean("jobLauncher");
        JobParameters jobParameters = new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
        Job job = (Job)context.getBean("importProductsJob");
        
        jobLauncher.run(job, jobParameters);
    }
}

Below is the complete configuration in xml file

Configuration


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
    
    <bean id="employee" class="package5.Employee" scope="prototype"/>
    
    <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="resource" value="file:FileInput.txt"/>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="id,name,status,salary"/>
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="prototypeBeanName" value="employee"/>
                    </bean>
                </property>
            </bean>
        </property> 
    </bean>
    
    <bean id="writer" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:DemoData2.txt"/>
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names" value="id,name,status,salary"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
    
    <bean id="employeeFilterItemProcessor" class="package5.EmployeeFilterItemProcessor" />
    
    <batch:job id="importProductsJob">
        <batch:step id="readWriteProducts">
            <batch:tasklet>
                <batch:chunk reader="reader" writer="writer" processor="employeeFilterItemProcessor" commit-interval="50"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>
    
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>
    
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>
</beans>

Leave a Reply