我是Spring Batch和Spring Cloud 数据流的新手 . 我已经在配置类中配置了我的工作,我的工作是通过api调用执行的 . 我使用作业启动器来启动作业 . 我已连接到Mysql实例,因此Spring Batch框架在我的数据库中创建默认表 . 我的问题是没有插入表的值 . 我已经完成了我的工作,我可以通过引用日志看到我的工作完成了 . 但执行细节未插入表中 . 帮我解决这个问题 . 这是我的配置类和yml文件 .
package com.acheron.batch.config;
import java.util.List;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import
org.springframework.batch.core.configuration.
annotation.EnableBatchProcessing;
import org.springframework.batch.core.
configuration.annotation.JobBuilderFactory;
import org.springframework.batch.
core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.
core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.
explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.
repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.batch.
support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import com.acheron.batch.vo.Contents;
import com.acheron.batch.vo.Schedules;
@Configuration
@EnableBatchProcessing
public class BatchConfiguaration {
@Autowired
private DataSource datasouce;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public Environment env;
@Bean(name = "reader")
@StepScope
public ItemReader<Schedules> reader(@Value("#{stepExecutionContext[scheduleRecs]}") List<Schedules> scherecs) {
ItemReader<Schedules> reader = new IteratorItemReader<Schedules>(scherecs);
return reader;
}
@Bean(name = "CWSreader")
@StepScope
public ItemReader<Contents> CWSreader(@Value("#{stepExecutionContext[scheduleRecs]}") List<Contents> scherecs) {
ItemReader<Contents> reader = new IteratorItemReader<Contents>(scherecs);
return reader;
}
@SuppressWarnings("rawtypes")
@Bean
@StepScope
public BatchProcessor processor() {
return new BatchProcessor();
}
@SuppressWarnings("rawtypes")
@Bean
@StepScope
public ContentWorkflowBatchProcessor CWSprocessor() {
return new ContentWorkflowBatchProcessor();
}
@Bean(name = "batchSchedulePreparedStatement")
@StepScope
public BatchSchedulePreparedStatement batchSchedulePreparedStatement() {
return new BatchSchedulePreparedStatement();
}
@Bean(name = "contentWorkflowPreparedStatement")
@StepScope
public ContentWorkflowPreparedStatement contentWorkflowPreparedStatement() {
return new ContentWorkflowPreparedStatement();
}
@Bean(name = "ContentWorkflowBatchWriter")
@StepScope
public ContentWorkflowBatchWriter ContentWorkflowBatchWriter() {
return new ContentWorkflowBatchWriter();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean(name = "batchWriter")
@StepScope
public BatchWriter batchWriter() {
BatchWriter batchWriter = new BatchWriter();
batchWriter.setDataSource(datasouce);
batchWriter.setSql(env.getProperty("batch.insert.schedule.query"));
batchWriter.setItemPreparedStatementSetter(batchSchedulePreparedStatement());
return batchWriter;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean(name = "CWSbatchWriter")
@StepScope
public ContentWorkflowBatchWriter CWSbatchWriter() {
ContentWorkflowBatchWriter contentWorkflowBatchWriter = new ContentWorkflowBatchWriter();
contentWorkflowBatchWriter.setDataSource(datasouce);
contentWorkflowBatchWriter.setSql(env.getProperty("batch.insert.contentworkflow.query"));
contentWorkflowBatchWriter.setItemPreparedStatementSetter(contentWorkflowPreparedStatement());
return contentWorkflowBatchWriter;
}
@Bean("acheronDbTm")
@Qualifier("acheronDbTm")
public PlatformTransactionManager platformTransactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobExplorer jobExplorer() throws Exception {
MapJobExplorerFactoryBean explorerFactoryBean = new MapJobExplorerFactoryBean();
explorerFactoryBean.setRepositoryFactory(mapJobRepositoryFactoryBean());
explorerFactoryBean.afterPropertiesSet();
return explorerFactoryBean.getObject();
}
@Bean
public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean() {
MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
mapJobRepositoryFactoryBean.setTransactionManager(platformTransactionManager());
return mapJobRepositoryFactoryBean;
}
@Bean
public JobRepository jobRepository() throws Exception {
return mapJobRepositoryFactoryBean().getObject();
}
@Bean
public SimpleJobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
@Bean(name = "batchPartition")
@StepScope
public BatchPartition batchPartition() {
BatchPartition batchPartition = new BatchPartition();
return batchPartition;
}
@Bean(name = "contentWorkflowPartition")
@StepScope
public ContentWorkflowPartition ContentWorkflowPartition() {
ContentWorkflowPartition contentWorkflowPartition = new ContentWorkflowPartition();
return contentWorkflowPartition;
}
@Bean(name="taskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
poolTaskExecutor.setCorePoolSize(10);
poolTaskExecutor.setMaxPoolSize(30);
poolTaskExecutor.setQueueCapacity(35);
poolTaskExecutor.setThreadNamePrefix("Acheron");
poolTaskExecutor.afterPropertiesSet();
return poolTaskExecutor;
}
@Bean(name="CWSTaskExecutor")
public TaskExecutor CWSTaskExecutor() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
poolTaskExecutor.setCorePoolSize(10);
poolTaskExecutor.setMaxPoolSize(30);
poolTaskExecutor.setQueueCapacity(35);
poolTaskExecutor.setThreadNamePrefix("Acheron");
poolTaskExecutor.afterPropertiesSet();
return poolTaskExecutor;
}
@Bean(name = "masterStep")
public Step masterStep() {
return stepBuilderFactory.get("masterStep").partitioner(slave()).partitioner("slave", batchPartition())
.taskExecutor(taskExecutor()).build();
}
@Bean(name = "CWSmasterStep")
public Step CWSmasterStep() {
return stepBuilderFactory.get("CWSmasterStep").partitioner(CWSslave())
.partitioner("CWSslave", ContentWorkflowPartition()).taskExecutor(CWSTaskExecutor()).build();
}
@Bean(name = "slave")
public Step slave() {
return stepBuilderFactory.get("slave").chunk(100).faultTolerant().retryLimit(2)
.retry(DeadlockLoserDataAccessException.class).reader(reader(null)).processor(processor())
.writer(batchWriter()).build();
}
@Bean(name = "CWSslave")
public Step CWSslave() {
return stepBuilderFactory.get("CWSslave").chunk(100).faultTolerant().retryLimit(2)
.retry(DeadlockLoserDataAccessException.class).reader(CWSreader(null)).processor(processor())
.writer(CWSbatchWriter()).build();
}
@Bean(name = "manageStagingScheduleMaster")
public Job manageStagingScheduleMaster(final Step masterStep) throws Exception {
return jobBuilderFactory.get("manageStagingScheduleMaster").preventRestart().incrementer(new RunIdIncrementer())
.start(masterStep).build();
@Bean(name = "manageContentWorkflowStaging")
public Job manageContentWorkflowStaging(final Step CWSmasterStep) throws Exception {
return jobBuilderFactory.get("manageContentWorkflowStaging").preventRestart()
.incrementer(new RunIdIncrementer()).start(CWSmasterStep).build();
}
}`
我的YML文件:
rest:
api1: http://localhost:8888/v1/daq/schedule?sourceId=7&endDate=11/01/2018&page=1&pageSize=2
api2: https://api.myjson.com/bins/1gm3ef
ContentWorkflow: http://localhost:8085/v1/cwdata/programcontentgap?&
responseCount: 10000
contentWorkflowType: ContentWorkflow
batch.insert.schedule.query: 'insert into clouddataflow.STAGING_SCHEDULE_MASTER (SCHEDULE_ID,SOURCE_ID,SOURCE_NAME,AIRING_TYPE_ID,AIRING_TYPE_NAME,CREATED_DATE,CREATED_USER,MODIFIED_DATE,MODIFIED_USER)values(?,?,?,?,?,?,?,?,?)'
batch.insert.contentworkflow.query: 'insert into clouddataflow.CONTENT_WORKFLOW_STAGING (CONTENT_WORKFLOW_STAGING_ID,PROGRAM_ID,MASTER_TITLE,ORIGINAL_TITLE,PROGRAM_TYPE_ID,PROGRAM_TYPE_NAME,SOURCE_GROUP_ID,SOURCE_GROUP_NAME,REGION,PROGRAM_LANGUAGE,CW_STATUS,COPY_CULTURE,GENRES,KEYWORDS,AIR_DATE_TIME,IS_PROCESSED,CREATED_USER,CREATED_DATE,MODIFIED_USER,MODIFIED_DATE)values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'
modifiedUser: admin
threadCount: 10
spring.acherondb.url: 'jdbc:mysql://localhost:3306/clouddataflow'
spring.acherondb.uname: uname
spring.acherondb.pwd: pwd
spring.acherondb.dclassName: com.mysql.jdbc.Driver
spring.acherondb.schema: clouddataflow
spring.acherondb.showSql: true
spring.jpa.hibernate.naming.physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
spring:
batch:
job:
enabled: true
datasource:
type: com.zaxxer.hikari.HikariDataSource
hikari:
jdbc-url: jdbc:mysql://localhost:3306/clouddataflow
username: uname
password: pwd
driver-class-name: com.mysql.jdbc.Driver
connection-timeout: 300
idle-timeout: 30
minimum-idle: 10
maximum-pool-size: 30
logging:
level:
com.acheron: ERROR
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss} {%thread} %-5level %logger{36} -%msg%n'
file: application
path: ./logs
queueScheduleData:
createUser: User1
status: Success
modifiedUser: User1
status:
success: Success
completed: COMPLETED
我的工作日志是,
29-12-2017 15:00:38.997 [http-nio-9191-exec-1] INFO osbclsupport.SimpleJobLauncher.run - 作业:[SimpleJob:[name = manageContentWorkflowStaging]]使用以下参数启动:[] 29-12-2017 15:00:39.022 [http-nio-9191-exec-1] INFO osbatch.core.job.SimpleStepHandler.handleStep - 执行步骤:[CWSmasterStep] 29-12-2017 15:00 :44.710 [http-nio-9191-exec-1] INFO osbclsupport.SimpleJobLauncher.run - 作业:[SimpleJob:[name = manageContentWorkflowStaging]]使用以下参数完成:[]并具有以下状态: [完成] .