Adding Chunk Level Listeners using ChunkListener

In this post of Spring Batch, I will explain how to add chunk level listeners.

In Spring Batch, items are read one at a time, but written to file as a chunk (a collection of records written as one unit).

So if total records are 100 and chunk size is 50, they will be two written operations (first for records 0 to 49 and second for records 50 to 100)

Spring Batch provides a facility to add listeners at chunk level. The listeners are executed before and after the chunk records are written.

This post explains how to create chunk level listeners and integrate them with the job. We can add our custom processing logic in these listeners and expect them to be called before and after the chunk is written.

To create listeners, we need to create class that implements org.springframework.batch.core.ChunkListener interface as shown below

Listener Code


package package10;

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

public class CustomChunkListener implements ChunkListener {
    @Override
    public void afterChunk(ChunkContext chunkContext) {
        System.out.println("Chunk processing ended " + chunkContext);
    }

    @Override
    public void afterChunkError(ChunkContext chunkContext) {

    }

    @Override
    public void beforeChunk(ChunkContext chunkContext) {
        System.out.println("Chunk processing started " + chunkContext);
    }
}

We need to provide implementation for three methods as shown above.

An instance of ChunkContext provide access to current chunk’s information.

Next we need to integrate with job using xml configuration as shown below


1   <batch:job id="importProductsJob">
2       <batch:step id="readWriteProducts">
3           <batch:tasklet>
4               <batch:chunk reader="reader" writer="writer" commit-interval="50"/>
5           </batch:tasklet>
6           <batch:listeners>
7               <batch:listener ref="customChunkListener"/>
8           </batch:listeners>
9       </batch:step>
10  </batch:job>  

At line 7, we integrate the listener with the job. At line 4 commit-interval attribute defines the chunk size.

Below is the complete xml configuration


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
    
    <bean id="employee" class="package10.Employee" scope="prototype"/>
    
    <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="resource" value="file:FileInput.txt"/>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="id,name,status,salary"/>
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="prototypeBeanName" value="employee"/>
                    </bean>
                </property>
            </bean>
        </property> 
    </bean>
    
    <bean id="writer" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:FileOutput.txt"/>
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names" value="id,name,status,salary"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
    
    <bean id="customChunkListener" class="package10.CustomChunkListener"/>
    
    <batch:job id="importProductsJob">
        <batch:step id="readWriteProducts">
            <batch:tasklet>
                <batch:chunk reader="reader" writer="writer" commit-interval="50"/>
            </batch:tasklet>
            <batch:listeners>
                <batch:listener ref="customChunkListener"/>
            </batch:listeners>
        </batch:step>
    </batch:job>
    
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>
    
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>
</beans>

Output

Sep 15, 2018 12:34:16 PM org.springframework.context.support.ClassPathXmlApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@61e4705b: startup date [Sat Sep 15 12:34:16 IST 2018]; root of context hierarchy
Sep 15, 2018 12:34:16 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [package10/job.xml]
Sep 15, 2018 12:34:17 PM org.springframework.batch.core.launch.support.SimpleJobLauncher afterPropertiesSet
INFO: No TaskExecutor has been set, defaulting to synchronous executor.
Sep 15, 2018 12:34:17 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=importProductsJob]] launched with the following parameters: [{date=1536995057370}]
Sep 15, 2018 12:34:17 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [readWriteProducts]
Chunk processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=0, FlatFileItemWriter.written=0, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=0, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Sep 15 12:34:17 IST 2018}
Chunk processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=1810, FlatFileItemWriter.written=50, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=50, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Sep 15 12:34:17 IST 2018}
Chunk processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=1810, FlatFileItemWriter.written=50, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=50, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Sep 15 12:34:17 IST 2018}
Chunk processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=100, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Sep 15 12:34:17 IST 2018}
Chunk processing started ChunkContext: attributes=[], complete=false, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=100, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Sep 15 12:34:17 IST 2018}
Chunk processing ended ChunkContext: attributes=[], complete=true, stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={FlatFileItemWriter.current.count=3660, FlatFileItemWriter.written=100, batch.taskletType=org.springframework.batch.core.step.item.ChunkOrientedTasklet, FlatFileItemReader.read.count=101, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={date=Sat Sep 15 12:34:17 IST 2018}
Sep 15, 2018 12:34:17 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=importProductsJob]] completed with the following parameters: [{date=1536995057370}] and the following status: [COMPLETED]

Leave a Reply