首页 文章

Spring Batch:作业随机选择行数少于commit-interval的行

提问于
浏览
0

我遇到了Spring Batch的问题 . 我们在我们的工作中使用了一个任务 Actuator (simpleAsyncTaskExecutor)来处理两个并行步骤的流程 .

在每个步骤中,任务执行程序将读取器返回的每个数据块拆分到不同的线程(使用多线程步骤概念:请参阅https://docs.spring.io/spring-batch/trunk/reference/html/scalability.html

问题是我们的提交间隔很大(24,000),读取器返回的行数非常小(少于50行)但是编写器有时会收到多个块(例如30行的块)一个20行的块,但对于另一个运行,它可以是25行的一个块和25行的块或者只有一个50行的块,它看起来是随机的)而它应该只接收一个50行的块运行(它不应该是随机的),因为它不会超过commit-interval .

我试图理解为什么这会在某些运行中随机发生 . 如果有人在Spring Batch中知道这个问题,你能帮助我吗?

谢谢 .

这是我的工作配置(不包括我们的自定义编写器):

<batch:job id="job">

             <batch:split id="split" task-executor="taskExecutor">  
                <batch:flow> 
                    <batch:step id="step1"> 
                        <batch:tasklet task-executor="taskExecutor" throttle-limit="4" >
                              <batch:chunk reader="reader1" writer="writer1" commit-interval="24000" />         
                        </batch:tasklet>
                    </batch:step>
                </batch:flow>
                <batch:flow> 

                    <batch:step id="step2">
                        <batch:tasklet task-executor="taskExecutor" throttle-limit="4" >
                              <batch:chunk reader="reader2" writer="writer2" commit-interval="24000" />         
                        </batch:tasklet>
                    </batch:step>
                </batch:flow>       
             </batch:split>
        </batch:job>

    <bean id="reader1" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">       
         <property name="dataSource" ref="postgresql_1" />                           

         <property name="queryProvider">            
             <bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider">
                    <property name="selectClause" value="
                        SELECT name
                    " />
                    <property name="fromClause" value="
                        FROM database.people
                    " />
                    <property name="whereClause" value="
                        WHERE age > 30
                    " />
                    <property name="sortKeys">
                        <map>
                            <entry key="people_id" value="ASCENDING"/>
                        </map>
                    </property>             
            </bean>         
        </property>         
        <property name="saveState" value="false" />         
        <property name="rowMapper">             
             <bean class="fr.myapp.PeopleRowMapper" />      
        </property>     
    </bean>

        <bean id="reader2" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">       
       <property name="dataSource" ref="postgresql_1" />        
       <property name="queryProvider">          
               <bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider">
                    <property name="selectClause" value="
                         SELECT product_name
                    " />
                    <property name="fromClause" value="
                        FROM database.products
                    " />
                    <property name="whereClause" value="
                        WHERE product_order_date <= '01/11/2017'
                    " />
                    <property name="sortKeys">
                        <map>
                            <entry key="product_id" value="ASCENDING"/>
                        </map>
                    </property>             
               </bean>      
           </property>      
       <property name="saveState" value="false" />      
       <property name="rowMapper">
                <bean class="fr.myapp.ProductsRowMapper" />
       </property>  
    </bean>
    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
          <property name="concurrencyLimit" value="8" />
    </bean>

1 回答

  • 0

    在写入的挂起记录数达到commit-interval(或块大小)之前,不应调用writer的write()方法 . 在此之前唯一应该调用的是read()在读取器中返回null,表示没有剩余结果 .

    当你看到较小的块是它在工作的中间? RowMappers中是否有任何逻辑或者您省略的任何逻辑汇总了读取结果?

相关问题