Java Reference
In-Depth Information
Multithreaded Step
Chunk
Regular Step
Chunk
Regular Step
Chunk
Figure 11-12. Multithreaded step processing
As Figure 11-12 shows, any step in a job can be configured to perform within a threadpool,
processing each chunk independently. As chunks are processed, Spring Batch keeps track of what is
done accordingly. If an error occurs in any one of the threads, the job's processing is rolled back or
terminated per the regular Spring Batch functionality.
To configure a step to execute in a multithreaded manor, all you need to do is configure a reference
to a TaskExecutor for the given step. If you use the statement job as an example, Listing 11-3 shows how
to configure the calculateTransactionFees step (step 5) to be a multithreaded step.
Listing 11-3. calculateTransactionFees as a Multithreaded Step
<step id="calculateTransactionFees">
<tasklet task-executor="taskExecutor">
<chunk reader="transactionPricingItemReader" processor="feesItemProcessor"
writer="applyFeeWriter" commit-interval="100"/>
</tasklet>
</step>
<beans:bean id="taskExecutor"
class="org.springframework.core.task.SimpleAsyncTaskExecutor">
<beans:property name="concurrencyLimit" value="10"/>
</beans:bean>
As Listing 11-3 shows, all that is required to add the power of Spring's multithreading capabilities to
a step in your job is to define a TaskExecutor implementation (you use
org.springframework.core.task.SimpleAsyncTaskExecutor in this example) and reference it in your step.
When you execute the statement job, Spring creates a threadpool of 10 threads, executing each chunk in
a different thread or 10 chunks in parallel. As you can imagine, this can be a powerful addition to most
jobs.
But there is a catch when working with multithreaded steps. Most ItemReaders provided by Spring
Batch are stateful. Spring Batch uses this state when it restarts a job so it knows where processing left off.
However, in a multithreaded environment, objects that maintain state in a way that is accessible to
multiple threads (not synchronized, and so on) can run into issues of threads overwriting each other's
state.
To get around the issue of state, you use the concept of staging the records to be processed in your
batch run. The concept is quite simple. Before the step begins, you tag all the records in a way that
identifies them as the records to be processed in the current batch run (or JobInstance) using a
StepListener. The tagging can be by either updating a special column or columns on the database field
or copying the records into a staging table. Then, the ItemReader reads the records that were tagged at
 
Search WWH ::




Custom Search