Java Reference
In-Depth Information
}
public void setWhereClause(String whereClause) {
this.whereClause = whereClause;
}
}
As chunks are processed, regardless of the thread, StagingChunkUpdater updates the items to be
flagged as processed. You still need to do two things. First, you need to update the configuration to use
the new listeners; and second, you need to update the query used for this step's ItemReader to include
jobId and the processed flag in its criteria. Listing 11-6 shows the updated configuration including the
updated ItemReader, the new staging listeners, and the updated calculateTransactionFees step.
Listing 11-6. Configuration for the Multithreaded Step with Staging Listeners
<beans:bean id="transactionPricingItemReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<beans:property name="dataSource" ref="dataSource"/>
<beans:property name="sql" value="select a.id as accountId, a.accountNumber,
t.id as transactionId, t.qty, tk.ticker, a.tier, t.executedTime, t.dollarAmount from
account a inner join transaction t on a.id = t.account_id inner join ticker tk on
t.tickerId = tk.id and t.processed = false and t.jobId = #{jobParameters[run.id]}
order by t.executedTime"/>
<beans:property name="rowMapper" ref="transactionPricingRowMapper"/>
</beans:bean>
<beans:bean id="transactionPricingRowMapper"
class="com.apress.springbatch.statement.reader.AccountTransactionRowMapper"/>
<step id="calculateTransactionFees">
<tasklet task-executor="taskExecutor">
<chunk reader="transactionPricingItemReader" processor="feesItemProcessor"
writer="applyFeeWriter" commit-interval="100"/>
<listeners>
<listener ref="stagingStepListener"/>
<listener ref="stagingChunkUpdater"/>
</listeners>
</tasklet>
</step>
<beans:bean id="stagingStepListener"
class="com.apress.springbatch.statement.listener.StagingStepListener" scope="step">
<beans:property name="dataSource" ref="dataSource"/>
<beans:property name="tableName" value="transaction"/>
<beans:property name="whereClause"
value="where jobId is null and processed is null"/>
<beans:property name="jobId" value="#{jobParameters[run.id]}"/>
</beans:bean>
<beans:bean id="stagingChunkUpdater"
class="com.apress.springbatch.statement.listener.StagingChunkUpdater" scope="step">
Search WWH ::




Custom Search