首页 文章

Spring 季批处理作业执行的资源列表

提问于
浏览
1

我需要通过spring批处理作业执行来处理flatfile . 我从文件夹中获取需要处理的文件列表 . 请让我知道如何做到这一点 .

例如:
我在一个文件夹中有 player.csvplayer1.csvplayer2.csv . 我在列表中有这些文件名 . 如果我可以为Jobparameters输入所有这些文件名,请告诉我,以便我可以执行一个Job?

JobParameters jobParameters = new JobParametersBuilder().addString("input.file", "file:player.csv").toJobParameters();
JobExecution execution = jobLauncher.run(job,jobParameters);
assertEquals(ExitStatus.COMPLETED, execution.getExitStatus());

1 回答

  • 2

    有几种方法可以做到这一点; 1.创建一个作业来一次处理一个文件并让你的作业发起者迭代文件并触发作业,一次只传入一个文件2.在你的工作中创建一个'循环'来检查值和处理每个文件

    这是后一种选择的一个例子;

    首先是作业配置 - 它使用决策程序为要处理的文件构建执行上下文值 . 如果没有更多文件(来自jobParameter),那么它就完成了

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:batch="http://www.springframework.org/schema/batch"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
    
        <batch:job id="multipleFileProcess">
            <batch:decision decider="decider" id="multipleFileProcess.decider">
                <batch:next on="UNKNOWN" to="multipleFileProcess.step1"/>
                <batch:end on="COMPLETED"/>
            </batch:decision>
            <batch:step id="multipleFileProcess.step1" next="multipleFileProcess.decider">
                <batch:tasklet>
                    <batch:chunk reader="fileReader" writer="outWriter" commit-interval="10"/>
                </batch:tasklet>
            </batch:step>
            <batch:validator>
                <bean class="org.springframework.batch.core.job.DefaultJobParametersValidator">
                    <property name="requiredKeys" value="input.files"/>
                </bean>         
            </batch:validator>
        </batch:job>
    
    
        <bean id="fileReader"
            class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
            <property name="lineMapper" ref="lineMapper"/>
            <property name="resource" value="file:#{jobExecutionContext['input.file']}"/>
        </bean>
    
        <bean id="lineMapper"
            class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/>
    
    
        <bean id="transactionManager"
            class="org.springframework.batch.support.transaction.ResourcelessTransactionManager">
        </bean>
    
        <bean id="decider" class="de.incompleteco.spring.batch.decider.FileDecision"/>
    
        <bean id="outWriter" class="de.incompleteco.spring.batch.item.writer.SystemOutItemWriter"/>
    
    </beans>
    

    这里是决策程序代码(使用内部队列 - 将这个'队列'维护为执行上下文中的字符串可能会更安全,您可以慢慢缩短(手动'弹出'字符串的条目 - 但是为了快速使用队列)

    package de.incompleteco.spring.batch.decider;
    
    import java.util.Arrays;
    import java.util.Queue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.job.flow.FlowExecutionStatus;
    import org.springframework.batch.core.job.flow.JobExecutionDecider;
    
    public class FileDecision implements JobExecutionDecider {
    
        public static final String INPUT_FILE = "input.file";
        public static final String INPUT_FILES = "input.files";
        public static final String DELIMITER = ",";
    
        private Queue<String> inputFiles;
    
        @Override
        public FlowExecutionStatus decide(JobExecution jobExecution,StepExecution stepExecution) {
            //check if the jobExecution has the input.file in it's context
            if (!jobExecution.getExecutionContext().containsKey(INPUT_FILE)) {
                //build the queue
                inputFiles = new LinkedBlockingQueue<String>(Arrays.asList(jobExecution.getJobParameters().getString(INPUT_FILES).split(DELIMITER)));
            }//end if
            //pop and add
            String file = inputFiles.poll();
            if (file != null) {
                jobExecution.getExecutionContext().put(INPUT_FILE, file);
                return FlowExecutionStatus.UNKNOWN;
            }//end if
            //return 'done'
            return FlowExecutionStatus.COMPLETED;
        }
    
    }
    

    这里是虚拟的“作家”,将文件显示为“书面”

    package de.incompleteco.spring.batch.item.writer;
    
    import java.util.List;
    
    import org.springframework.batch.item.ItemWriter;
    
    public class SystemOutItemWriter implements ItemWriter<String> {
    
        @Override
        public void write(List<? extends String> items) throws Exception {
            for (String item : items) {
                System.out.println("this is what was received:" + item);
            }//end for
        }
    
    }
    

    最后单元测试看看是否运行

    package de.incompleteco.spring.batch;
    
    import static org.junit.Assert.assertFalse;
    
    import java.io.File;
    import java.io.FileWriter;
    
    import javax.annotation.Resource;
    
    import org.junit.Rule;
    import org.junit.Test;
    import org.junit.rules.TemporaryFolder;
    import org.junit.runner.RunWith;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.explore.JobExplorer;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import de.incompleteco.spring.batch.decider.FileDecision;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("classpath:/META-INF/spring/*-context.xml")
    public class MultipleFileProcessIntegrationTest {
    
        @Rule
        public TemporaryFolder folder = new TemporaryFolder();
    
        @Resource
        private Job job;
    
        @Resource
        private JobLauncher jobLauncher;
    
        @Resource
        private JobExplorer jobExplorer;
    
        @Test
        public void test() throws Exception {
            //somewhere to hold the filenames
            StringBuilder builder = new StringBuilder();
            //create 3 files
            for (int i=0;i<3;i++) {
                File file = folder.newFile("testfile" + i + ".txt");
                //write some content
                FileWriter writer = new FileWriter(file);
                writer.write("test content: " + i);
                writer.flush();
                writer.close();
                //add the filename
                if (i > 0) {
                    builder.append(FileDecision.DELIMITER);
                }//end if
                builder.append(file.getAbsolutePath());
                //show it
                System.out.println(file.getAbsolutePath());
            }//end loop
    
    
            //now build the job parameters
            JobParameters parameters = new JobParametersBuilder().addString(FileDecision.INPUT_FILES,builder.toString()).toJobParameters();
            //execution
            JobExecution execution = jobLauncher.run(job,parameters);
            //check
            while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
                Thread.sleep(100);
            }//end while
            //load
            execution = jobExplorer.getJobExecution(execution.getId());
            //check
            assertFalse(execution.getStatus().isUnsuccessful());
        }
    
    }
    

相关问题