首页 文章

如何使用不同的参数运行Spring批处理作业?

提问于
浏览
1

我正在使用 spring 批与自定义读者和作家 .

我有 control tablecustomerId's .

我需要多次运行相同的步骤,对于控制表中的每个客户一次 .

customerId 应该能够作为参数传递,因为我在阅读器和编写器中都需要它 .

如何才能最好地实现这一目标?

@Bean
public Step shipmentFactsStep() {
    return stepBuilderFactory.get("shipmentFactsStep")
            .<Shipmentfacts, Shipmentfacts>chunk(10000)
            .reader(shipmentfactsItemReader())
            .processor(shipmentFactProcessor())
            .writer(shipmentFactsWriter())
            .build();
}

1 回答

  • 0

    实现这一目标的一种方法是Partitioning . 如果您希望跟踪哪些 customersIds 已经完成,这种方法似乎更好,因为每个客户ID都有一个从属步骤 .

    步骤

    1.首先通过实现 org.springframework.batch.core.partition.support.Partitioner 接口创建分区器类,并为每个客户ID填充 Map<String, ExecutionContext> .

    由于您是按客户ID进行分区,因此方法参数 gridSize 将不会用于您的案例 .

    代码看起来像这样,其中 allCustomers 是您从数据库中准备的列表 .

    课 - CustomerPartitioner

    Map<String, ExecutionContext> result = new HashMap<>();
    int partitionNumber = 0;
     for (String customer: allCustomers) {
            ExecutionContext value = new ExecutionContext();
            value.putString("customerId", customer);
            result.put("Customer Id [" + customer+ "] : THREAD "
                + partitionNumber, value);
            partitionNumber++;
            }
    

    2.根据主步和从步骤修改步骤定义 . 请参阅在线教程 .

    示例代码与此类似 .

    @Bean
        public Step customerPartitionerStep() throws Exception {
        return step.get("customerPartitionerStep")
            .partitioner(shipmentFactsStep())
            .partitioner("shipmentFactsStep", customerPartitioner())
            .gridSize(partitionerGridSize).taskExecutor(taskExecutor())
            .build();
        }
    
        @Bean
    public Step shipmentFactsStep() {
        return stepBuilderFactory.get("shipmentFactsStep")
                .<Shipmentfacts, Shipmentfacts>chunk(10000)
                .reader(shipmentfactsItemReader())
                .processor(shipmentFactProcessor())
                .writer(shipmentFactsWriter())
                .build();
    }
    
    @Bean
        public Partitioner customerPartitioner() {
        return new CustomerPartitioner();
        }
    
    @Bean
        public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit);
        return simpleTaskExecutor;
        }
    

    您可以将 partitionerGridSize 设置为任何值,因为它未在分区程序实现中使用 . 您可以稍后使用它来根据总数记录而不仅仅是客户ID进行分区 .

    3.在上面步骤#2的代码中,设置 concurrencyLimit=1 非常重要..这样一次只能运行一个客户,它将为您在步骤#1中输入的所有客户运行 . 通过设置此值,您可以并行运行任意数量的客户 .

    4.步骤#1步骤中来自分区器的 customerId 可以通过读取,处理器等访问

    @Bean
    @StepScope
    public ItemReader<ReadBean> shipmentfactsItemReader(
            @Value("#{stepExecutionContext[customerId]}" String customerId){
    ..
    }
    

    注意注释, @StepScope ..这对于这个值的绑定是必需的 . 此外,在您的阅读器定义中,您需要像这样传递 null - .reader(shipmentfactsItemReader(null))

    在Spring Batch元数据中,您将拥有与客户数量相同的步骤以及一个主步骤 . 所有从属步骤完成后,主步骤将结束 .

    这里的优点是,如果需要,您可以并行处理许多客户,并且客户的每个从属步骤将在其自己的单独线程中运行 .

    希望能帮助到你 !!

相关问题