首页 文章

分区Spring Batch Step重复相同的成功从属StepExecutions

提问于
浏览
0

使用Spring Batch 3.0.4.RELEASE .

我将作业配置为使用分区步骤 . 从属步骤使用块大小1.任务 Actuator 中有六个线程 . 我运行这个测试的网格大小从六到几百 . 我的网格大小是我期望的从属StepExecutions的数量==我的分区器创建的ExecutionContexts的数量 .

结果总是如此:

六个线程获取六个不同的步骤执行并执行它们 successfully . 那么 the same six step executions run again and again 在同一个帖子里!

我注意到RepeatTemplate.executeInternal(...)中有一个永远不会结束的循环 . 它继续执行相同的StepExecution只是递增版本 .

这是Java配置代码:

@Bean
@StepScope
public RapRequestItemReader rapReader(
        @Value("#{stepExecutionContext['" + RapJobConfig.LIST_OF_IDS_STEP_EXECUTION_CONTEXT_VAR + "']}") String listOfIds,
        final @Value("#{stepExecutionContext['" + RapJobConfig.TIME_STEP_EXECUTION_CONTEXT_VAR + "']}") String timeString) {
    final List<Asset> farms = Arrays.asList(listOfIds.split(",")).stream().map(intString -> assetDao.getById(Integer.valueOf(intString)))
            .collect(Collectors.toList());
    return new RapRequestItemReader(timeString, farms);
}

@Bean
public ItemProcessor<RapRequest, PullSuccess> rapProcessor() {
    return rapRequest -> {
        return rapPull.pull(rapRequest.timestamp, rapRequest.farms);
    };
}

@Bean
public TaskletStep rapStep1(StepBuilderFactory stepBuilderFactory, RapRequestItemReader rapReader) {
    return stepBuilderFactory.get(RAP_STEP_NAME)
            .<RapRequest, PullSuccess> chunk(RAP_STEP_CHUNK_SIZE)
            .reader(rapReader)
            .processor(rapProcessor())
            .writer(updateCoverageWriter)
            .build();
}

private RapFilePartitioner createRapFilePartitioner(RapParameter rapParameter) {
    RapFilePartitioner partitioner = new RapFilePartitioner(rapParameter, rapPull.getIncrementHours());
    return partitioner;
}

@Bean
public ThreadPoolTaskExecutor pullExecutor() {
    ThreadPoolTaskExecutor pullExecutor = new ThreadPoolTaskExecutor();
    pullExecutor.setCorePoolSize(weatherConfig.getNumberOfThreadsPerModelType());
    pullExecutor.setMaxPoolSize(weatherConfig.getNumberOfThreadsPerModelType());
    pullExecutor.setAllowCoreThreadTimeOut(true);
    return pullExecutor;
}

@Bean
@JobScope
public Step rapPartitionByTimestampStep(StepBuilderFactory stepBuilderFactory, @Value("#{jobParameters['config']}") String config,
        TaskletStep rapStep1) {
    RapParameter rapParameter = GsonHelper.fromJson(config, RapParameter.class);
    int gridSize = calculateGridSize(rapParameter);
    return stepBuilderFactory.get("rapPartitionByTimestampStep")
            .partitioner(rapStep1)
            .partitioner(RAP_STEP_NAME, createRapFilePartitioner(rapParameter))
            .taskExecutor(pullExecutor())
            .gridSize(gridSize)
            .build();
}

@Bean
public Job rapJob(JobBuilderFactory jobBuilderFactory, Step rapPartitionByTimestampStep) {
    return jobBuilderFactory.get(JOB_NAME)
            .start(rapPartitionByTimestampStep)
            .build();
}

1 回答

  • 0

    虽然这个问题很难说清楚,但问题在于读者 . ItemReader永远不会返回null .

    在设计中,StepExecution应该只处理一个项目 . 但是,在处理该项之后,ItemReader再次返回相同的项而不是返回null .

    我通过让ItemReader在第二次调用read时返回null来修复它 .

    更好的设计可能是使用TaskletStep而不是ChunkStep .

相关问题