我是Spring Batch和Spring Cloud 数据流的新手 . 我已经在配置类中配置了我的工作,我的工作是通过api调用执行的 . 我使用作业启动器来启动作业 . 我已连接到Mysql实例,因此Spring Batch框架在我的数据库中创建默认表 . 我的问题是没有插入表的值 . 我已经完成了我的工作,我可以通过引用日志看到我的工作完成了 . 但执行细节未插入表中 . 帮我解决这个问题 . 这是我的配置类和yml文件 .

package com.acheron.batch.config;

import java.util.List;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import 
org.springframework.batch.core.configuration.
annotation.EnableBatchProcessing;
import org.springframework.batch.core.
configuration.annotation.JobBuilderFactory;
import org.springframework.batch.
core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.
core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.
explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.
repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.batch.
support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import com.acheron.batch.vo.Contents;
import com.acheron.batch.vo.Schedules;

@Configuration
@EnableBatchProcessing

 public class BatchConfiguaration {
 @Autowired
private DataSource datasouce;

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
public Environment env;

@Bean(name = "reader")
@StepScope
public ItemReader<Schedules> reader(@Value("#{stepExecutionContext[scheduleRecs]}") List<Schedules> scherecs) {
    ItemReader<Schedules> reader = new IteratorItemReader<Schedules>(scherecs);
    return reader;
}

@Bean(name = "CWSreader")
@StepScope
public ItemReader<Contents> CWSreader(@Value("#{stepExecutionContext[scheduleRecs]}") List<Contents> scherecs) {
    ItemReader<Contents> reader = new IteratorItemReader<Contents>(scherecs);
    return reader;
}

@SuppressWarnings("rawtypes")
@Bean
@StepScope
public BatchProcessor processor() {
    return new BatchProcessor();
}

@SuppressWarnings("rawtypes")
@Bean
@StepScope
public ContentWorkflowBatchProcessor CWSprocessor() {
    return new ContentWorkflowBatchProcessor();
}


@Bean(name = "batchSchedulePreparedStatement")
@StepScope
public BatchSchedulePreparedStatement batchSchedulePreparedStatement() {
    return new BatchSchedulePreparedStatement();
}

@Bean(name = "contentWorkflowPreparedStatement")
@StepScope
public ContentWorkflowPreparedStatement contentWorkflowPreparedStatement() {
    return new ContentWorkflowPreparedStatement();
}

@Bean(name = "ContentWorkflowBatchWriter")
@StepScope
public ContentWorkflowBatchWriter ContentWorkflowBatchWriter() {
    return new ContentWorkflowBatchWriter();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean(name = "batchWriter")
@StepScope
public BatchWriter batchWriter() {
    BatchWriter batchWriter = new BatchWriter();
    batchWriter.setDataSource(datasouce);
    batchWriter.setSql(env.getProperty("batch.insert.schedule.query"));
    batchWriter.setItemPreparedStatementSetter(batchSchedulePreparedStatement());
    return batchWriter;

}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean(name = "CWSbatchWriter")
@StepScope
public ContentWorkflowBatchWriter CWSbatchWriter() {
    ContentWorkflowBatchWriter contentWorkflowBatchWriter = new ContentWorkflowBatchWriter();
    contentWorkflowBatchWriter.setDataSource(datasouce);
    contentWorkflowBatchWriter.setSql(env.getProperty("batch.insert.contentworkflow.query"));
    contentWorkflowBatchWriter.setItemPreparedStatementSetter(contentWorkflowPreparedStatement());
    return contentWorkflowBatchWriter;

}

@Bean("acheronDbTm")
@Qualifier("acheronDbTm")
public PlatformTransactionManager platformTransactionManager() {
    return new ResourcelessTransactionManager();
}

@Bean
public JobExplorer jobExplorer() throws Exception {
    MapJobExplorerFactoryBean explorerFactoryBean = new MapJobExplorerFactoryBean();
    explorerFactoryBean.setRepositoryFactory(mapJobRepositoryFactoryBean());
    explorerFactoryBean.afterPropertiesSet();
    return explorerFactoryBean.getObject();
}

@Bean
public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean() {
    MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
    mapJobRepositoryFactoryBean.setTransactionManager(platformTransactionManager());
    return mapJobRepositoryFactoryBean;
}

@Bean
public JobRepository jobRepository() throws Exception {
    return mapJobRepositoryFactoryBean().getObject();
}

@Bean
public SimpleJobLauncher jobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository());
    return jobLauncher;
}

@Bean(name = "batchPartition")
@StepScope
public BatchPartition batchPartition() {
    BatchPartition batchPartition = new BatchPartition();
    return batchPartition;
}

@Bean(name = "contentWorkflowPartition")
@StepScope
public ContentWorkflowPartition ContentWorkflowPartition() {
    ContentWorkflowPartition contentWorkflowPartition = new ContentWorkflowPartition();
    return contentWorkflowPartition;
}

@Bean(name="taskExecutor")
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
    poolTaskExecutor.setCorePoolSize(10);
    poolTaskExecutor.setMaxPoolSize(30);
    poolTaskExecutor.setQueueCapacity(35);
    poolTaskExecutor.setThreadNamePrefix("Acheron");
    poolTaskExecutor.afterPropertiesSet();
    return poolTaskExecutor;
}

@Bean(name="CWSTaskExecutor")
public TaskExecutor CWSTaskExecutor() {
    ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
    poolTaskExecutor.setCorePoolSize(10);
    poolTaskExecutor.setMaxPoolSize(30);
    poolTaskExecutor.setQueueCapacity(35);
    poolTaskExecutor.setThreadNamePrefix("Acheron");
    poolTaskExecutor.afterPropertiesSet();
    return poolTaskExecutor;
}

@Bean(name = "masterStep")
public Step masterStep() {
    return stepBuilderFactory.get("masterStep").partitioner(slave()).partitioner("slave", batchPartition())
            .taskExecutor(taskExecutor()).build();
}

@Bean(name = "CWSmasterStep")
public Step CWSmasterStep() {
    return stepBuilderFactory.get("CWSmasterStep").partitioner(CWSslave())
            .partitioner("CWSslave", ContentWorkflowPartition()).taskExecutor(CWSTaskExecutor()).build();
}

@Bean(name = "slave")
public Step slave() {
    return stepBuilderFactory.get("slave").chunk(100).faultTolerant().retryLimit(2)
            .retry(DeadlockLoserDataAccessException.class).reader(reader(null)).processor(processor())
            .writer(batchWriter()).build();

}

@Bean(name = "CWSslave")
public Step CWSslave() {
    return stepBuilderFactory.get("CWSslave").chunk(100).faultTolerant().retryLimit(2)
            .retry(DeadlockLoserDataAccessException.class).reader(CWSreader(null)).processor(processor())
            .writer(CWSbatchWriter()).build();

}

@Bean(name = "manageStagingScheduleMaster")
public Job manageStagingScheduleMaster(final Step masterStep) throws Exception {
    return jobBuilderFactory.get("manageStagingScheduleMaster").preventRestart().incrementer(new RunIdIncrementer())
            .start(masterStep).build();
 @Bean(name = "manageContentWorkflowStaging")
public Job manageContentWorkflowStaging(final Step CWSmasterStep) throws Exception {
    return jobBuilderFactory.get("manageContentWorkflowStaging").preventRestart()
            .incrementer(new RunIdIncrementer()).start(CWSmasterStep).build();
}
}`

我的YML文件:

rest:
      api1: http://localhost:8888/v1/daq/schedule?sourceId=7&endDate=11/01/2018&page=1&pageSize=2
      api2: https://api.myjson.com/bins/1gm3ef
      ContentWorkflow: http://localhost:8085/v1/cwdata/programcontentgap?&
      responseCount: 10000
      contentWorkflowType: ContentWorkflow
    batch.insert.schedule.query: 'insert into clouddataflow.STAGING_SCHEDULE_MASTER (SCHEDULE_ID,SOURCE_ID,SOURCE_NAME,AIRING_TYPE_ID,AIRING_TYPE_NAME,CREATED_DATE,CREATED_USER,MODIFIED_DATE,MODIFIED_USER)values(?,?,?,?,?,?,?,?,?)'
    batch.insert.contentworkflow.query: 'insert into clouddataflow.CONTENT_WORKFLOW_STAGING (CONTENT_WORKFLOW_STAGING_ID,PROGRAM_ID,MASTER_TITLE,ORIGINAL_TITLE,PROGRAM_TYPE_ID,PROGRAM_TYPE_NAME,SOURCE_GROUP_ID,SOURCE_GROUP_NAME,REGION,PROGRAM_LANGUAGE,CW_STATUS,COPY_CULTURE,GENRES,KEYWORDS,AIR_DATE_TIME,IS_PROCESSED,CREATED_USER,CREATED_DATE,MODIFIED_USER,MODIFIED_DATE)values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'
    modifiedUser: admin
    threadCount: 10
    spring.acherondb.url: 'jdbc:mysql://localhost:3306/clouddataflow'
    spring.acherondb.uname: uname
    spring.acherondb.pwd: pwd
    spring.acherondb.dclassName: com.mysql.jdbc.Driver
    spring.acherondb.schema: clouddataflow
    spring.acherondb.showSql: true
    spring.jpa.hibernate.naming.physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
    spring:
      batch:
        job:
          enabled: true
      datasource:
        type: com.zaxxer.hikari.HikariDataSource
        hikari:
            jdbc-url: jdbc:mysql://localhost:3306/clouddataflow
            username: uname
            password: pwd
            driver-class-name: com.mysql.jdbc.Driver
            connection-timeout: 300
            idle-timeout: 30
            minimum-idle: 10
            maximum-pool-size: 30
    logging:
      level:
        com.acheron: ERROR
      pattern:
        file: '%d{yyyy-MM-dd HH:mm:ss} {%thread} %-5level %logger{36} -%msg%n'
      file: application
      path: ./logs
    queueScheduleData:
      createUser: User1
      status: Success
      modifiedUser: User1
    status:
      success: Success
      completed: COMPLETED

我的工作日志是,

29-12-2017 15:00:38.997 [http-nio-9191-exec-1] INFO osbclsupport.SimpleJobLauncher.run - 作业:[SimpleJob:[name = manageContentWorkflowStaging]]使用以下参数启动:[] 29-12-2017 15:00:39.022 [http-nio-9191-exec-1] INFO osbatch.core.job.SimpleStepHandler.handleStep - 执行步骤:[CWSmasterStep] 29-12-2017 15:00 :44.710 [http-nio-9191-exec-1] INFO osbclsupport.SimpleJobLauncher.run - 作业:[SimpleJob:[name = manageContentWorkflowStaging]]使用以下参数完成:[]并具有以下状态: [完成] .