Spring启动批处理分区JdbcCursorItemReader错误

即使遵循Victor Jabor blog非常全面的例子,我也无法让它工作 . 我按照他的描述跟踪了他的配置并使用了所有最新的依赖项 . 我,正如维克多试图从一个数据库读取并写入另一个数据库 . 我有这个工作没有分区但需要分区来提高性能,因为我需要能够在5分钟内读取500到1000万行 .

以下似乎有效:1)ColumnRangePartitioner 2)TaskExecutorPartitionHandler根据gridsize构建正确数量的步骤任务,并从ColumnRangePartitioner设置的stepExecution中生成正确的线程数3)setPreparedStatementSetter .

但是当我运行应用程序时,我从JdbcCursorItemReader得到错误,这些错误不一致,我不明白 . 作为最后的手段,我将不得不调试JdbcCursorItemReader . 我希望在此之前得到一些帮助,希望这将是一个配置问题 .

错误:引起:java.sql.SQLException:oracle.jdbc.driver.OracleResultSetImpl.getInt(OracleResultSetImpl.java:901)〜[ojdbc6-11.2.0.2.0.jar:11.2.0.2.0]中的已用尽结果集org.springframework.jdbc.support.JdbcUtils.getResultSetValue(JdbcUtils.java:160)〜[spring-jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE] org.springframework.jdbc.core.BeanPropertyRowMapper.getColumnValue (BeanPropertyRowMapper.java:370)〜[spring-jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE]在org.springframework.jdbc.core.BeanPropertyRowMapper.mapRow(BeanPropertyRowMapper.java:291)〜[spring- jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE]在org.springframework.batch.item.database.JdbcCursorItemReader.readCursor(JdbcCursorItemReader.java:139)〜[spring-batch-infrastructure-3.0.7.RELEASE的.jar:3.0.7.RELEASE]

配置类:

@Configuration @EnableBatchProcessing public class BatchConfiguration {

    @Bean
    public ItemProcessor<Archive, Archive> processor(@Value("${etl.region}") String region) {
        return new ArchiveProcessor(region);
    }

    @Bean
    public ItemWriter<Archive> writer(@Qualifier(value = "postgres") DataSource dataSource) {
        JdbcBatchItemWriter<Archive> writer = new JdbcBatchItemWriter<>();

        writer.setSql("insert into tdw_src.archive (id) " +
                "values (:id)");
        writer.setDataSource(dataSource);
        writer.setItemSqlParameterSourceProvider(new org.springframework.batch.item.database.
                BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }

    @Bean
    public Partitioner archivePartitioner(@Qualifier(value = "gmDataSource") DataSource dataSource,
                                          @Value("ROWNUM") String column,
                                          @Value("archive") String table,
                                          @Value("${gm.datasource.username}") String schema) {
        return new ColumnRangePartitioner(dataSource, column, schema + "." + table);
    }

    @Bean
    public Job archiveJob(JobBuilderFactory jobs, Step partitionerStep, JobExecutionListener listener) {
        return jobs.get("archiveJob")
                .preventRestart()
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .start(partitionerStep)
                .build();
    }

    @Bean
    public Step partitionerStep(StepBuilderFactory stepBuilderFactory,
                                Partitioner archivePartitioner,
                                Step step1,
                                @Value("${spring.batch.gridsize}") int gridSize) {
        return stepBuilderFactory.get("partitionerStep")
                .partitioner(step1)
                .partitioner("step1", archivePartitioner)
                .gridSize(gridSize)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean(name = "step1")
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Archive> customReader,
                      ItemWriter<Archive> writer, ItemProcessor<Archive, Archive> processor) {
        return stepBuilderFactory.get("step1")
                .listener(customReader)
                .<Archive, Archive>chunk(5)
                .reader(customReader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor();
    }

    @Bean
    public SimpleJobLauncher getJobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }

Custom Reader:-

public class CustomReader extends JdbcCursorItemReader<Archive> implements StepExecutionListener {

    private StepExecution stepExecution;

    @Autowired
    public CustomReader(@Qualifier(value = "gmDataSource") DataSource geomangerDataSource,
                        @Value("${gm.datasource.username}") String schema) throws Exception {
        super();
        this.setSql("SELECT TMP.* FROM (SELECT ROWNUM AS ID_PAGINATION, id FROM " + schema + ".archive) TMP " +
                "WHERE TMP.ID_PAGINATION >= ? AND TMP.ID_PAGINATION <= ?");
        this.setDataSource(geomangerDataSource);
        BeanPropertyRowMapper<Archive> rowMapper = new BeanPropertyRowMapper<>(Archive.class);
        this.setRowMapper(rowMapper);
        this.setFetchSize(5);
        this.setSaveState(false);

        this.setVerifyCursorPosition(false);
// not sure if this is needed?       this.afterPropertiesSet();
    }

    @Override
    public synchronized void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
        this.setPreparedStatementSetter(getPreparedStatementSetter());
    }

    private PreparedStatementSetter getPreparedStatementSetter() {
        ListPreparedStatementSetter listPreparedStatementSetter = new ListPreparedStatementSetter();
        List<Integer> list = new ArrayList<>();
        list.add(stepExecution.getExecutionContext().getInt("minValue"));
        list.add(stepExecution.getExecutionContext().getInt("maxValue"));
        listPreparedStatementSetter.setParameters(list);
        LOGGER.debug("getPreparedStatementSetter list: " + list);
        return listPreparedStatementSetter;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}

回答(1)

2 years ago

我把这一切都搞定了 .

首先,我需要在CustomReader中对select语句进行排序,因此所有线程的rownum保持不变,最后我必须使用@StepScope为该步骤中使用的每个bean使用bean .

实际上我不会使用rownum,因为这需要订购以减少松散的性能,因此我将使用pk列来获得最佳性能 .