我们在Spring批处理中使用多线程步骤来执行以下操作 . 读取表中的多行 . 表中的每一行都将被处理并写入一个单独的excel文件中 .

要在JDBCCursorItemreader中实现线程安全 - 我已经完成了以下操作 . 1)创建了一个SynchronizedItemReader,它将同步调用委托给JDBCCursrItemReader的read方法 . 2)在jdbcitemreader bean中设置saveState = false . 3)在输入表中添加了一个过程指示器

我有以下问题1)根据我的理解,如果我们设置saveState = false Spring批处理不会将状态存储在Execution上下文中,即READ_COUNT,COMMIT_COUNT,WRITE_COUNT等将保持为0 .

但是,即使使用saveState = false,也会更新计数 . 我在下面给出了xml配置 . 任何想法为什么会这样?

2)我理解JDBCBatchItemWriter是线程安全的 . 但这是否意味着,一次只有一个线程会写入业务数据?或者它只是为了保持国家目的而安全?如果一次只有一个线程写入数据库,那么我不确定性能是否良好

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"        xmlns:util="http://www.springframework.org/schema/util"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch 
http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util-3.2.xsd
">


<!-- database settings -->
<import resource="../config/database-context.xml" />
<import resource="../config/launch-context.xml" />

<batch:job id="multiThreadedJob">
    <batch:step id="readWriteMultiThreadedStep">
        <batch:tasklet task-executor="taskExecutor">
            <batch:chunk reader="synchronizingItemReader" processor="itemProcessor"
                writer="excelWriter" commit-interval="1" />
        </batch:tasklet>
    </batch:step>
</batch:job>
<bean id="taskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="5" />
    <property name="maxPoolSize" value="5" />
</bean>

<bean id="synchronizingItemReader" class="org.multithreadedstep.SynchronizingItemReader">
    <property name="delegate" ref="jdbcItemReader" />
</bean>
<!-- inject stepExecutionContext -->
<bean id="itemProcessor" class="org.multithreadedstep.UserProcessor" scope="step">
    <property name="threadName" value="#{stepExecutionContext[name]}" />
    <property name="baseDAO" ref="baseDAO" />       
</bean>

<bean id="jdbcItemReader"
    class="org.springframework.batch.item.database.JdbcCursorItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="sql">
        <value>
            <![CDATA[

            SELECT id,
             user_login,
             user_pass,
             age,
             name
            FROM users_test where processed='N'          
         ORDER BY TO_NUMBER (id) 
            ]]>
        </value>
    </property>
    <property name="rowMapper" ref="userRowMapper" />
     <property name="saveState" value="false"/>
</bean>
<bean id="userRowMapper" class="org.multithreadedstep.UserRowMapper" />

<bean id="excelWriter"
class="org.multithreadedstep.ExcelWriter" scope="step">

</bean>
<bean id="docName" class="java.lang.String" scope="prototype"/>
<bean id="baseDAO"
    class="org.multithreadedstep.BaseDAO">
    <property name="jdbcTemplate" ref="jdbcTemplate" />
</bean>


<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
  <constructor-arg ref="dataSource"/>
SynchronizingItemReader.java
package org.multithreadedstep;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;

public class SynchronizingItemReader implements ItemReader<User>, ItemStream {
private ItemReader<User> delegate;

public synchronized User read() throws Exception {
    return delegate.read();
}

public void close() throws ItemStreamException {
    if (this.delegate instanceof ItemStream) {
        ((ItemStream) this.delegate).close();
    }
}

public void open(ExecutionContext context) throws ItemStreamException {
    if (this.delegate instanceof ItemStream) {
        ((ItemStream) this.delegate).open(context);
    }
}

public void update(ExecutionContext context) throws ItemStreamException {
    if (this.delegate instanceof ItemStream) {
        ((ItemStream) this.delegate).update(context);
    }
}

/**
 * @return the delegate
 */
public ItemReader<User> getDelegate() {
    return delegate;
}

/**
 * @param delegate the delegate to set
 */
public void setDelegate(ItemReader<User> delegate) {
    this.delegate = delegate;
}

}