In this post under Spring Batch, I will explain how to chain multiple ChunkListener together.
For this example I will create two custom ChunkListener interface implementations named ‘CustomChunkListener1’ and ‘CustomChunkListener2’ as shown below
CustomChunkListener1
package package35;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
public class CustomChunkListener1 implements ChunkListener {
@Override
public void afterChunk(ChunkContext chunkContext) {
System.out.println("ChunkListener1 processing ended " + chunkContext);
}
@Override
public void afterChunkError(ChunkContext chunkContext) {
}
@Override
public void beforeChunk(ChunkContext chunkContext) {
System.out.println("ChunkListener1 processing started " + chunkContext);
}
}
CustomChunkListener2
package package35;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
public class CustomChunkListener2 implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("ChunkListener2 processing started " + context);
}
@Override
public void afterChunk(ChunkContext context) {
System.out.println("ChunkListener2 processing ended " + context);
}
@Override
public void afterChunkError(ChunkContext context) {
}
}
Both the custom implementation classes print messages to console before and after chunk processing.
Next we compose them in a list in ‘CompositeChunkListener’ bean and integrate the ‘CompositeChunkListener’ bean to the job using the below xml configuration
1 <bean id="customChunkListener1" class="package35.CustomChunkListener1"/>
2
3 <bean id="customChunkListener2" class="package35.CustomChunkListener2"/>
4
5 <bean id="compositeChunkListener" class="org.springframework.batch.core.listener.CompositeChunkListener">
6 <property name="listeners">
7 <list>
8 <ref bean="customChunkListener1"/>
9 <ref bean="customChunkListener2"/>
10 </list>
11 </property>
12 </bean>
13
14 <batch:job id="importProductsJob">
15 <batch:step id="readWriteProducts">
16 <batch:tasklet>
17 <batch:chunk reader="reader" writer="writer" commit-interval="50"/>
18 </batch:tasklet>
19 <batch:listeners>
20 <batch:listener ref="compositeChunkListener"/>
21 </batch:listeners>
22 </batch:step>
23 </batch:job>
In the above xml code, CompositeChunkListener is a Spring Batch provided class that holds a list of different ChunkListener implementations.
When bean of CompositeChunkListener is integrated with job, the different ChunkListener implementations present in the list of CompositeChunkListener bean is called in order they are added.
In the above xml code at line 1 and 3 we define beans of CustomChunkListener1 and CustomChunkListener2 class.
At line 5 to 12 we define a bean of CompositeChunkListener class and set its ‘listeners’ property with a list containing a reference to each beans created at line 1 and 3.
At line 19 to 21 we integrate the CompositeChunkListener bean with the job using <batch:listeners> element.
In this way we can compose multiple ChunkListener and integrate them with the job.
Below is the complete xml configuration for your reference.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
<bean id="employee" class="package35.Employee" scope="prototype"/>
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:FileInput.txt"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="id,name,status,salary"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="employee"/>
</bean>
</property>
</bean>
</property>
</bean>
<bean id="writer" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:FileOutput.txt"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="id,name,status,salary"/>
</bean>
</property>
</bean>
</property>
</bean>
<bean id="customChunkListener1" class="package35.CustomChunkListener1"/>
<bean id="customChunkListener2" class="package35.CustomChunkListener2"/>
<bean id="compositeChunkListener" class="org.springframework.batch.core.listener.CompositeChunkListener">
<property name="listeners">
<list>
<ref bean="customChunkListener1"/>
<ref bean="customChunkListener2"/>
</list>
</property>
</bean>
<batch:job id="importProductsJob">
<batch:step id="readWriteProducts">
<batch:tasklet>
<batch:chunk reader="reader" writer="writer" commit-interval="50"/>
</batch:tasklet>
<batch:listeners>
<batch:listener ref="compositeChunkListener"/>
</batch:listeners>
</batch:step>
</batch:job>
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
</beans>
Below is the output
Output
Oct 03, 2020 12:36:56 PM org.springframework.context.support.ClassPathXmlApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@5b80350b: startup date [Sat Oct 03 12:36:56 IST 2020]; root of context hierarchy
Oct 03, 2020 12:36:56 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [package35/job.xml]
Oct 03, 2020 12:36:57 PM org.springframework.batch.core.launch.support.SimpleJobLauncher afterPropertiesSet
INFO: No TaskExecutor has been set, defaulting to synchronous executor.
Oct 03, 2020 12:36:57 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=importProductsJob]] launched with the following parameters: [{date=1601708817389}]
Oct 03, 2020 12:36:57 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [readWriteProducts]
ChunkListener1 processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=0, FlatFileItemWriter.written=0, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=0, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener2 processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=0, FlatFileItemWriter.written=0, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=0, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener2 processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=1810, FlatFileItemWriter.written=50, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=50, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener1 processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=1810, FlatFileItemWriter.written=50, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=50, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener1 processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=1810, FlatFileItemWriter.written=50, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=50, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener2 processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=1810, FlatFileItemWriter.written=50, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=50, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener2 processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=100, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener1 processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=100, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener1 processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=100, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener2 processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=100, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener2 processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=101, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
ChunkListener1 processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=101, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Oct 03 12:36:57 IST 2020}
Oct 03, 2020 12:36:57 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=importProductsJob]] completed with the following parameters: [{date=1601708817389}] and the following status: [COMPLETED]