首页 文章

Spring Batch - 循环读取器,处理器和写入器N次

提问于
浏览
1

在Spring Batch中,如何循环读取器,处理器和写入器N次?

我的要求是:

我有“N”没有 . 顾客/客户 . 对于每个客户/客户,我需要从数据库(Reader)获取记录,然后我必须处理(处理器)客户/客户端的所有记录,然后我必须将记录写入文件(Writer) .

如何循环 spring 批处理作业N次?

2 回答

  • 0

    AFAIK我担心这种情况没有框架支持 . 至少不是你想解决的方式 . 我建议以不同方式解决问题:

    Option 1

    一次读取/处理/写入所有客户的所有记录 . 如果它们都在同一个数据库中,则只能执行此操作 . 我不建议这样做,因为你必须配置JTA / XA交易,这不值得 .

    Option 2

    为每个客户运行一次工作(在我看来是最好的选择) . 在不同的属性文件中保存每个客户端的必要信息(数据库数据连接,客户端过滤记录的值,客户端可能需要的任何其他数据),并通过它必须使用的客户端将参数传递给作业 . 这样,您可以控制处理哪个客户端以及何时使用bash文件和/或cron . 如果您使用Spring Boot Spring Batch,您可以将客户端配置存储在配置文件(application-clientX.properties)中并运行以下过程:

    $>  java -Dspring.profiles.active="clientX"  \
         -jar "yourBatch-1.0.0-SNAPSHOT.jar"     \
         -next
    

    Bonus - Option 3

    如果abobe都不符合您的需求,或者您坚持以您呈现的方式解决问题,那么您可以根据参数动态配置作业,并使用JavaConf为每个客户端创建一个步骤:

    @Bean
    public Job job(){
        JobBuilder jb = jobBuilders.get("job");
        for(Client c : clientsToProcess) {
                jb.flow(buildStepByClient(c));
        };
        return jb.build();
    }
    

    再一次,我强烈建议你不要这样:丑陋,反对框架哲学,难以维护,调试,你可能还必须在这里使用JTA / XA,...

    我希望我能得到任何帮助!

  • 0

    Local Partitioning将解决您的问题 .

    在您的分区器中,您将把所有客户端ID放在 Map 中,如下所示(只是伪代码),

    public class PartitionByClient implements Partitioner {
    
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {
                Map<String, ExecutionContext> result = new HashMap<>();
                int partitionNumber = 1;
                for (String client: allClients) {
                ExecutionContext value = new ExecutionContext();
                value.putString("client", client);
                result.put("Client [" + client+ "] : THREAD " + partitionNumber, value);
                partitionNumber++;
                }
    
            } 
    
            return result;
            }
        }
    

    这只是一个伪代码 . 您必须查看详细的分区文档 .

    您必须在 @StepScope 中标记您的阅读器,处理器和编写器(即哪些部分需要 client 的值) . Reader将在SQL的 WHERE 子句中使用此 client . 您将在阅读器等定义中使用 @Value("#{stepExecutionContext[client]}") String client 来注入此值 .

    现在最后一块,您将需要一个任务执行程序,并且等于 concurrencyLimit 的客户端将并行启动,前提是您在主分区程序步骤配置中设置此任务执行程序 .

    @Bean
        public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit);
        return simpleTaskExecutor;
        }
    

    如果您希望一次只运行一个客户端, concurrencyLimit 将为 1 .

相关问题