首页 文章

使用spring批处理分区处理大量数据

提问于
浏览
3

我正在实现spring批处理作业,使用分区方法处理数据库表中的数百万条记录,如下所示 -

  • 从分区中的表中获取唯一的分区代码,并在执行上下文中设置相同的分区代码 .

  • 使用阅读器,处理器和编写器创建一个块步骤,以根据特定的分区代码处理记录 .

这种方法是正确的还是有更好的方法来处理这种情况?由于某些分区代码可以具有比其他分区代码更多的记录,因此具有更多记录的那些可能比具有较少记录的分区代码花费更多时间来处理 .

是否有可能创建分区/线程来处理如thread1进程1-1000,thread2进程1001-2000等?

如何控制创建的线程数,因为分区代码可以在100左右,我想在5次迭代中只创建20个线程和进程?

如果一个分区失败会发生什么,所有处理都会停止并恢复?

以下是配置 -

<bean id="MyPartitioner" class="com.MyPartitioner" />
 <bean id="itemProcessor" class="com.MyProcessor" scope="step" />
 <bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step" >
  <property name="dataSource" ref="dataSource"/>
  <property name="sql" value="select * from mytable WHERE code = '#{stepExecutionContext[code]}' "/>
  <property name="rowMapper">
      <bean class="com.MyRowMapper" scope="step"/>
  </property>
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
    <property name="corePoolSize" value="20"/>
    <property name="maxPoolSize" value="20"/>
    <property name="allowCoreThreadTimeOut" value="true"/>
</bean>

<batch:step id="Step1" xmlns="http://www.springframework.org/schema/batch">
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="itemReader"  processor="itemProcessor" writer="itemWriter" commit-interval="200"/>
    </batch:tasklet>
</batch:step>
<batch:job id="myjob">
    <batch:step id="mystep">
        <batch:partition step="Step1" partitioner="MyPartitioner">
            <batch:handler grid-size="20" task-executor="taskExecutor"/>
        </batch:partition>
    </batch:step>
</batch:job>

分区 -

public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
List<String> codes = getCodes();

for (String code : codes)
{
    ExecutionContext context = new ExecutionContext();
    context.put("code", code);
    partitionMap.put(code, context);
}
return partitionMap;}}

谢谢

1 回答

  • 5

    我会说这是正确的方法,我不明白为什么你需要每1000个项目有一个线程,如果你对每个独特的分区代码进行分区并且有1000个项目的块,你将在每个线程1000个项目上进行交易,这是IMO好的 .

    • 除了保存唯一的分区代码之外,您还可以通过为每1000个相同的分区代码创建新的子上下文来计算每个分区代码和分区的数量(对于具有2200条记录的分区代码,您将调用3)具有上下文参数的线程:1 => partition_key = key1,skip = 0,count = 1000,2 => partition_key = key1,skip = 1000,count = 1000和3 => partition_key = key1,skip = 2000,count = 1000)如果那是你想要的,但我仍然会没有它

    • 线程数由 ThreadPoolTaskExecutor 控制,在创建时传递给分区步骤 . 你有方法 setCorePoolSize() 你可以在20上设置,你将获得最多20个线程 . 下一个细粒度配置是 grid-size ,它表示将从完整分区映射创建多少个分区 . 这是explanation of grid size . 分区就是分工 . 之后,您的线程配置将定义实际处理的并发性 .

    • 如果一个分区失败,则整个分区步骤失败,并显示分区失败的信息 . 成功分区已完成,并且不会再次调用,并且当作业重新启动时,它将通过重做失败和未处理的分区来获取它停止的位置 .

    希望我能找到你所有的问题,因为有很多问题 .

    Example of case 1 - 也许有错误,但只是为了得到想法:

    public class MyPartitioner implements Partitioner{
    @Override
    public Map<String, ExecutionContext> partition(int gridSize)
    {
        Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
        Map<String, int> codesWithCounts = getCodesWithCounts();
    
        for (Entry<String, int> codeWithCount : codesWithCounts.entrySet())
        {
            for (int i = 0; i < codeWithCount.getValue(); i + 1000){
                ExecutionContext context = new ExecutionContext();
                context.put("code", code);
                context.put("skip", i);
                context.put("count", 1000);
                partitionMap.put(code, context);
            }
        }
        return partitionMap;
    }
    

    与你的页数相比,你可以从上下文得到多少你应该跳过的内容,在2200的示例中将是:0,1000,2000 .

相关问题