首页 文章

Spring Batch Multiple Threads

提问于
浏览
0

我正在写一个Spring Batch,想要在需要时扩展它 . 我的ApplicationContext看起来像这样

@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@ComponentScan(basePackages = "in.springbatch")
@PropertySource(value = {"classpath:springbatch.properties"})

public class ApplicationConfig {

@Autowired
Environment environment;

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job job() throws Exception {

    return jobs.get("spring_batch")
            .flow(step()).end()
            .build();
}

@Bean(name = "dataSource", destroyMethod = "close")
public DataSource dataSource() {

    BasicDataSource basicDataSource = new BasicDataSource();



    return basicDataSource;
}

@Bean
public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setTransactionManager(transactionManager());
    jobRepositoryFactoryBean.setDataSource(dataSource());
    return jobRepositoryFactoryBean.getObject();
}

 @Bean(name = "batchstep")
 public Step step() throws Exception {

    return    stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
    transactionManager(transactionManager()).
          chunk(2).reader(batchReader()).processor(processor()).writer(writer()).build();

  }



@Bean
ItemReader batchReader() throws Exception {
    System.out.println(Thread.currentThread().getName()+"reader");
    HibernateCursorItemReader<Source> hibernateCursorItemReader = new HibernateCursorItemReader<>();
    hibernateCursorItemReader.setQueryString("from Source");
    hibernateCursorItemReader.setFetchSize(2);
    hibernateCursorItemReader.setSessionFactory(sessionFactory().getObject());


    hibernateCursorItemReader.close();
    return hibernateCursorItemReader;
}

@Bean
 public ItemProcessor processor() {
     return new BatchProcessor();
 }

@Bean
public ItemWriter writer() {
    return new BatchWriter();
}

public TaskExecutor taskExecutor(){

    SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
    asyncTaskExecutor.setConcurrencyLimit(5);
    return asyncTaskExecutor;


}
@Bean
public LocalSessionFactoryBean sessionFactory() {
    LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean();
    sessionFactory.setDataSource(dataSource());
    sessionFactory.setPackagesToScan(new String[]{"in.springbatch.entity"});
    sessionFactory.setHibernateProperties(hibernateProperties());

    return sessionFactory;
}

@Bean
public PersistenceExceptionTranslationPostProcessor exceptionTranslation() {
    return new PersistenceExceptionTranslationPostProcessor();
}

@Bean
@Autowired
public HibernateTransactionManager transactionManager() {
    HibernateTransactionManager txManager = new HibernateTransactionManager();
    txManager.setSessionFactory(sessionFactory().getObject());

    return txManager;
}

Properties hibernateProperties() {
    return new Properties() {
        {
            setProperty("hibernate.hbm2ddl.auto",       environment.getProperty("hibernate.hbm2ddl.auto"));
            setProperty("hibernate.dialect", environment.getProperty("hibernate.dialect"));
            setProperty("hibernate.globally_quoted_identifiers", "false");

        }
    };
}

}

  • 通过以上配置,我可以从DB读取,处理数据并写入DB .

  • 我使用块大小为2并使用HibernateCusrsorItem读取器从游标读取2条记录,我从DB读取的查询基于日期来选择当前日期记录 .

  • 到目前为止,我能够实现所需的行为以及重新启动功能,只能选择由于前一次运行失败而未处理的作业 .

现在我的要求是批量使用多个线程来处理数据并写入数据库 .

我的处理器和编写器看起来像这样

@Component
public class BatchProcessor implements ItemProcessor<Source,DestinationDto>{

@Override
public DestinationDto process(Source source) throws Exception {

        System.out.println(Thread.currentThread().getName()+":"+source);
        DestinationDto destination=new DestinationDto();
        destination.setName(source.getName());
        destination.setValue(source.getValue());
        destination.setSourceId(source.getSourceId().toString());

    return destination;
}
@Component
public class BatchWriter implements ItemWriter<DestinationDto>{

@Autowired
IBatchDao batchDao;

@Override
public void write(List<? extends DestinationDto> list) throws Exception {
   System.out.println(Thread.currentThread().getName()+":"+list);
    batchDao.saveToDestination((List<DestinationDto>)list);
}

我更新了我的步骤并添加了ThreadPoolTaskExecutor,如下所示

@Bean(name = "batchstep")
public Step step() throws Exception {

    return  stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
     transactionManager(transactionManager()).chunk(1).reader(batchReader()).
     processor(processor()).writer(writer()).taskExecutor(taskExecutor()).build();

  }

在此之后,我的处理器被多个线程调用但具有相同的源数据 . 我需要做什么额外的事吗?

1 回答

  • 0

    这是一个很大的问题

    • 获得一个好答案的最佳选择是查看Spring Batch文档中的Scaling and Parallel Processing章节(Here

    • spring 批处理示例中可能有一些多线程样本(Here

    • 线程化Spring批处理作业的一种简单方法是创建未来处理器 - 您将所有处理逻辑放在未来对象中,而spring-processor类仅将对象添加到将来 . 然后,编写器类在执行写入过程之前等待将来完成 . 对不起,我没有为此指出你的样本 - 但如果你有具体问题我可以尝试回答!

相关问题