Chaining multiple ItemProcessor

In this post of Spring Batch, I will explain with an example how to chain multiple ItemProcessors.

For my example, I will create an ItemProcessor which will go through employee records one item at a time and filter out odd records.

Below is the code of the ItemProcessor


package package12;

import org.springframework.batch.item.ItemProcessor;

public class EmployeeFilterItemProcessor implements ItemProcessor {
    int i = 0;
    
    @Override
    public Employee process(Employee employee) throws Exception {
        Employee employee1;
        
        if(i%2 == 0) {
            employee1 = employee;
        } else {
            employee1 = null;
        }
        i = i + 1;
        return employee1;
    }
}

I will create two beans of same EmployeeFilterItemProcessor class and chain them one after another as shown in below xml code


1   <bean id="employeeFilterItemProcessor1" class="package12.EmployeeFilterItemProcessor" />
2   <bean id="employeeFilterItemProcessor2" class="package12.EmployeeFilterItemProcessor" />
3   
4   <bean id="employeeCompositeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
5       <property name="delegates">
6           <list>
7               <ref bean="employeeFilterItemProcessor1"/>
8               <ref bean="employeeFilterItemProcessor2"/>
9           </list>
10      </property>
11  </bean>

At line 1 and 2, we create bean employeeFilterItemProcessor1 and employeeFilterItemProcessor2 both of same class EmployeeFilterItemProcessor.

From line 4 to 11, we create composite bean of class org.springframework.batch.item.support.CompositeItemProcessor and provide the above two beans as list arguments.

So the list of employee records will be first processed by employeeFilterItemProcessor1 and all the odd employees are filtered out and new list is created.
That new list is taken as input and applied to second processor employeeFilterItemProcessor2 and again all the odd employees in the second list are filtered
and again a new list is created.

Now we integrate the composite processor to the job as shown in below xml code


1   <batch:job id="importProductsJob">
2       <batch:step id="readWriteProducts">
3           <batch:tasklet>
4               <batch:chunk reader="reader" writer="writer" processor="employeeCompositeItemProcessor" commit-interval="50"/>
5           </batch:tasklet>
6       </batch:step>
7   </batch:job>  

At line 4, we integrate the composite processor with job using processor attribute.

Below is the main code

Main code


package package12;

import java.util.Date;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Example12 {
    public static void main(String[] args) throws JobParametersInvalidException, JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException {
        ApplicationContext context = new ClassPathXmlApplicationContext("package12\\job.xml");
        JobLauncher jobLauncher = (JobLauncher)context.getBean("jobLauncher");
        JobParameters jobParameters = new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
        Job job = (Job)context.getBean("importProductsJob");
        
        jobLauncher.run(job, jobParameters);
    }
}

Below is the complete xml code


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
    
    <bean id="employee" class="package12.Employee" scope="prototype"/>
    
    <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="resource" value="file:FileInput.txt"/>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="id,name,status,salary"/>
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="prototypeBeanName" value="employee"/>
                    </bean>
                </property>
            </bean>
        </property> 
    </bean>
    
    <bean id="writer" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:FileOutput.txt"/>
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names" value="id,name,status,salary"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
    
    <bean id="employeeFilterItemProcessor1" class="package12.EmployeeFilterItemProcessor" />
    <bean id="employeeFilterItemProcessor2" class="package12.EmployeeFilterItemProcessor" />
    
    <bean id="employeeCompositeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
        <property name="delegates">
            <list>
                <ref bean="employeeFilterItemProcessor1"/>
                <ref bean="employeeFilterItemProcessor2"/>
            </list>
        </property>
    </bean>
    
    <batch:job id="importProductsJob">
        <batch:step id="readWriteProducts">
            <batch:tasklet>
                <batch:chunk reader="reader" writer="writer" processor="employeeCompositeItemProcessor" commit-interval="50"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>
    
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>
    
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>
</beans>

Leave a Reply