首页 文章

如何java为Spring批量数据和业务数据配置单独的数据源?我应该这样做吗?

提问于
浏览
19

我的主要工作只是读操作而另一个写了一些写但是 MyISAM engine 忽略了事务,所以我不需要必要的事务支持 . 如何配置 Spring Batch 以拥有自己的 JobRepository 数据源,与保存业务数据的数据源分开?最初的一个数据源配置如下所示:

@Configuration
public class StandaloneInfrastructureConfiguration {

    @Autowired
    Environment env;

    @Bean
    public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
      LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
      em.setDataSource(dataSource());
      em.setPackagesToScan(new String[] { "org.podcastpedia.batch.*" });

      JpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
      em.setJpaVendorAdapter(vendorAdapter);
      em.setJpaProperties(additionalJpaProperties());

      return em;
    }

    Properties additionalJpaProperties() {
          Properties properties = new Properties();
          properties.setProperty("hibernate.hbm2ddl.auto", "none");
          properties.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQL5Dialect");
          properties.setProperty("hibernate.show_sql", "true");

          return properties;
    }

    @Bean
    public DataSource dataSource(){

       return DataSourceBuilder.create()
                .url(env.getProperty("db.url"))
                .driverClassName(env.getProperty("db.driver"))
                .username(env.getProperty("db.username"))
                .password(env.getProperty("db.password"))
                .build();          
    }

    @Bean
    public PlatformTransactionManager transactionManager(EntityManagerFactory emf){
      JpaTransactionManager transactionManager = new JpaTransactionManager();
      transactionManager.setEntityManagerFactory(emf);

      return transactionManager;
    }
}

然后将它导入 Job 的配置类中,其中 @EnableBatchProcessing 注释自动使用它 . 我最初的想法是尝试设置配置类扩展 DefaultBatchConfigurer ,但后来我得到了

BeanCurrentlyInCreationException(org.springframework.beans.factory.BeanCurrentlyInCreationException:创建名为jobBuilders的bean时出错:当前正在创建请求的bean:是否存在无法解析的循环引用?):

@Configuration
@EnableBatchProcessing
@Import({StandaloneInfrastructureConfiguration.class, NotifySubscribersServicesConfiguration.class})
public class NotifySubscribersJobConfiguration extends DefaultBatchConfigurer {

    @Autowired
    private JobBuilderFactory jobBuilders;

    @Autowired
    private StepBuilderFactory stepBuilders;

    @Autowired
    private DataSource dataSource;

    @Autowired
    Environment env;

    @Override
    @Autowired
    public void setDataSource(javax.sql.DataSource dataSource) {
        super.setDataSource(batchDataSource());
    }

    private DataSource batchDataSource(){          
       return DataSourceBuilder.create()
                .url(env.getProperty("batchdb.url"))
                .driverClassName(env.getProperty("batchdb.driver"))
                .username(env.getProperty("batchdb.username"))
                .password(env.getProperty("batchdb.password"))
                .build();          
    } 

    @Bean
    public ItemReader<User> notifySubscribersReader(){

        JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();
        String sql = "select * from users where is_email_subscriber is not null";

        reader.setSql(sql);
        reader.setDataSource(dataSource);
        reader.setRowMapper(rowMapper());       

        return reader;
    }
........
}

任何想法都受到欢迎 . 该项目可在GitHub上获取 - https://github.com/podcastpedia/podcastpedia-batch

谢谢一堆 .

5 回答

  • 1

    好的,这很奇怪,但它确实有效 . 将数据源移动到它自己的配置类工作正常,并且可以自动装配 .

    该示例是Spring Batch Service Example的多数据源版本:

    DataSourceConfiguration

    public class DataSourceConfiguration {
    
        @Value("classpath:schema-mysql.sql")
        private Resource schemaScript;
    
        @Bean
        @Primary
        public DataSource hsqldbDataSource() throws SQLException {
            final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
            dataSource.setDriver(new org.hsqldb.jdbcDriver());
            dataSource.setUrl("jdbc:hsqldb:mem:mydb");
            dataSource.setUsername("sa");
            dataSource.setPassword("");
            return dataSource;
        }
    
        @Bean
        public JdbcTemplate jdbcTemplate(final DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }
    
        @Bean
        public DataSource mysqlDataSource() throws SQLException {
            final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
            dataSource.setDriver(new com.mysql.jdbc.Driver());
            dataSource.setUrl("jdbc:mysql://localhost/spring_batch_example");
            dataSource.setUsername("test");
            dataSource.setPassword("test");
            DatabasePopulatorUtils.execute(databasePopulator(), dataSource);
            return dataSource;
        }
    
        @Bean
        public JdbcTemplate mysqlJdbcTemplate(@Qualifier("mysqlDataSource") final DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }
    
        private DatabasePopulator databasePopulator() {
            final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
            populator.addScript(schemaScript);
            return populator;
        }
    }
    

    BatchConfiguration

    @Configuration
    @EnableBatchProcessing
    @Import({ DataSourceConfiguration.class, MBeanExporterConfig.class })
    public class BatchConfiguration {
    
        @Autowired
        private JobBuilderFactory jobs;
    
        @Autowired
        private StepBuilderFactory steps;
    
        @Bean
        public ItemReader<Person> reader() {
            final FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
            reader.setResource(new ClassPathResource("sample-data.csv"));
            reader.setLineMapper(new DefaultLineMapper<Person>() {
                {
                    setLineTokenizer(new DelimitedLineTokenizer() {
                        {
                            setNames(new String[] { "firstName", "lastName" });
                        }
                    });
                    setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
                        {
                            setTargetType(Person.class);
                        }
                    });
                }
            });
            return reader;
        }
    
        @Bean
        public ItemProcessor<Person, Person> processor() {
            return new PersonItemProcessor();
        }
    
        @Bean
        public ItemWriter<Person> writer(@Qualifier("mysqlDataSource") final DataSource dataSource) {
            final JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
            writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
            writer.setDataSource(dataSource);
            return writer;
        }
    
        @Bean
        public Job importUserJob(final Step s1) {
            return jobs.get("importUserJob").incrementer(new RunIdIncrementer()).flow(s1).end().build();
        }
    
        @Bean
        public Step step1(final ItemReader<Person> reader,
                final ItemWriter<Person> writer, final ItemProcessor<Person, Person> processor) {
            return steps.get("step1")
                    .<Person, Person> chunk(1)
                    .reader(reader)
                    .processor(processor)
                    .writer(writer)
                    .build();
        }
    }
    
  • 2

    我将数据源放在一个单独的配置类中 . 在批处理配置中,我们扩展DefaultBatchConfigurer并覆盖setDataSource方法,传入特定数据库以与Spring Batch一起使用@Qualifier . 我无法使用构造函数版本来使用它,但setter方法对我有用 .

    我的阅读器,处理器和编写器都在他们自己的自包含类中,以及步骤 .

    这是使用Spring Boot 1.1.8和Spring Batch 3.0.1 . Note: 对于使用Spring Boot 1.1.5的项目,我们有一个不同的设置,它在新版本上的工作方式不同 .

    package org.sample.config.jdbc;
    
    import javax.sql.DataSource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.core.env.Environment;
    
    import com.atomikos.jdbc.AtomikosDataSourceBean;
    import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
    
    /**
     * The Class DataSourceConfiguration.
     *
     */
    @Configuration
    public class DataSourceConfig {
    
        private final static Logger log = LoggerFactory.getLogger(DataSourceConfig.class);
    
        @Autowired private Environment env;
    
        /**
         * Siphon data source.
         *
         * @return the data source
         */
        @Bean(name = "mainDataSource")
        @Primary
        public DataSource mainDataSource() {
    
            final String user = this.env.getProperty("db.main.username");
            final String password = this.env.getProperty("db.main.password");
            final String url = this.env.getProperty("db.main.url");
    
            return this.getMysqlXADataSource(url, user, password);
        }
    
        /**
         * Batch data source.
         *
         * @return the data source
         */
        @Bean(name = "batchDataSource", initMethod = "init", destroyMethod = "close")
        public DataSource batchDataSource() {
    
            final String user = this.env.getProperty("db.batch.username");
            final String password = this.env.getProperty("db.batch.password");
            final String url = this.env.getProperty("db.batch.url");
    
            return this.getAtomikosDataSource("metaDataSource", this.getMysqlXADataSource(url, user, password));
        }
    
        /**
         * Gets the mysql xa data source.
         *
         * @param url the url
         * @param user the user
         * @param password the password
         * @return the mysql xa data source
         */
        private MysqlXADataSource getMysqlXADataSource(final String url, final String user, final String password) {
    
            final MysqlXADataSource mysql = new MysqlXADataSource();
            mysql.setUser(user);
            mysql.setPassword(password);
            mysql.setUrl(url);
            mysql.setPinGlobalTxToPhysicalConnection(true);
    
            return mysql;
        }
    
        /**
         * Gets the atomikos data source.
         *
         * @param resourceName the resource name
         * @param xaDataSource the xa data source
         * @return the atomikos data source
         */
        private AtomikosDataSourceBean getAtomikosDataSource(final String resourceName, final MysqlXADataSource xaDataSource) {
    
            final AtomikosDataSourceBean atomikos = new AtomikosDataSourceBean();
            atomikos.setUniqueResourceName(resourceName);
            atomikos.setXaDataSource(xaDataSource);
            atomikos.setMaxLifetime(3600);
            atomikos.setMinPoolSize(2);
            atomikos.setMaxPoolSize(10);
    
            return atomikos;
        }
    
    }
    
    
    package org.sample.settlement.batch;
    
    import javax.sql.DataSource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.transaction.PlatformTransactionManager;
    
    /**
     * The Class BatchConfiguration.
     *
     */
    @Configuration
    @EnableBatchProcessing
    public class BatchConfiguration extends DefaultBatchConfigurer {
        private final static Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
        @Autowired private JobBuilderFactory jobs;
        @Autowired private StepBuilderFactory steps;
        @Autowired private PlatformTransactionManager transactionManager;
        @Autowired @Qualifier("processStep") private Step processStep;
    
        /**
         * Process payments job.
         *
         * @return the job
         */
        @Bean(name = "processJob")
        public Job processJob() {
            return this.jobs.get("processJob")
                        .incrementer(new RunIdIncrementer())
                        .start(processStep)
                        .build();
        }
    
        @Override
        @Autowired
        public void setDataSource(@Qualifier("batchDataSource") DataSource batchDataSource) {
            super.setDataSource(batchDataSource);
        }
    }
    
  • 6

    你有没有试过这样的东西?

    @Bean(name="batchDataSource")
    public DataSource batchDataSource(){          
           return DataSourceBuilder.create()
                    .url(env.getProperty("batchdb.url"))
                    .driverClassName(env.getProperty("batchdb.driver"))
                    .username(env.getProperty("batchdb.username"))
                    .password(env.getProperty("batchdb.password"))
                    .build();          
    }
    

    然后使用@Primary标记其他数据源,并在批处理配置中使用@Qualifier来指定您要对batchDataSource bean进行auotwire .

  • 15

    https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#howto-two-datasources

    @Bean
    @Primary
    @ConfigurationProperties("app.datasource.first")
    public DataSourceProperties firstDataSourceProperties() {
        return new DataSourceProperties();
    }
    
    @Bean
    @Primary
    @ConfigurationProperties("app.datasource.first")
    public DataSource firstDataSource() {
        return firstDataSourceProperties().initializeDataSourceBuilder().build();
    }
    
    @Bean
    @ConfigurationProperties("app.datasource.second")
    public DataSourceProperties secondDataSourceProperties() {
        return new DataSourceProperties();
    }
    
    @Bean
    @ConfigurationProperties("app.datasource.second")
    public DataSource secondDataSource() {
        return secondDataSourceProperties().initializeDataSourceBuilder().build();
    }
    

    在应用程序属性中,您可以使用常规数据源属性:

    app.datasource.first.type=com.zaxxer.hikari.HikariDataSource
    app.datasource.first.maximum-pool-size=30
    
    app.datasource.second.url=jdbc:mysql://localhost/test
    app.datasource.second.username=dbuser
    app.datasource.second.password=dbpass
    app.datasource.second.max-total=30
    
  • 2

    假设您有2个数据源,一个用于弹出批处理元数据,例如作业详细信息[比如说CONFIGDB],另一个用于您的业务数据[比如说AppDB]:

    将CONFIGDB注入jobRepository,如下所示:

    <bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager" />
        <property name="dataSource" ref="CONFIGDB" />
        <property name="databaseType" value="db2" />
        <property name="tablePrefix" value="CONFIGDB.BATCH_" />
      </bean>
    

    现在你可以将AppDB dartasource注入到DAO的OR Writer中,如果有的话......

    <bean id="DemoItemWriter" class="com.demoItemWriter">
         <property name="dataSource" ref="AppDB" />     
       </bean>
    

    要么

    你可以定义一个资源,并在类所需的类中使用jndi查找注入此AppDB,如:

    public class ExampleDAO {
    
    @Resource(lookup = "java:comp/env/jdbc/AppDB")
    DataSource ds;
    

    }

相关问题