现在我以前的使用场景如下:

  • 使用FlatFileItemReader逐行读取带有.txt文件的输入流

  • 使用ItemProcessor进程每行数据调用http的远程服务

  • 使用FlatFileItemWriter将每个请求的结果写入文件

我想使用ItemProcessor in step 2 处理带有 multi thread 的远程调用

主要流程代码如下(带 spring 启动):

//read data
    FlatFileItemReader<ItemProcessing> reader = read(batchReqRun);

    //process data
    ItemProcessor<ItemProcessing, ItemProcessing> processor = process(batchReqDef);

    //write data
    File localOutputDir = new File(localStoragePath+"/batch-results");
    File localOutputFile = new File(localOutputDir, batchReqExec.getDestFile());
    FlatFileItemWriter<ItemProcessing> writer = write(localOutputDir,localOutputFile);

    StepExecutionListener stepExecListener = new StepExecutionListener() {
        @Override
        public void beforeStep(StepExecution stepExecution) {
            logger.info("Job {} step start {}",stepExecution.getJobExecutionId(), stepExecution.getStepName());
        }
        @Override
        public ExitStatus afterStep(StepExecution stepExecution) {
            logger.info("Job {} step end {}",stepExecution.getJobExecutionId(), stepExecution.getStepName());
            //.......ingore some code 
            return finalStatus;
        }
    };

    Tasklet resultFileTasklet = new BatchFileResultTasklet(localOutputFile, httpClientService);
    TaskletStep resultFileStep = stepBuilder.get("result")
            .tasklet(resultFileTasklet)
            .listener(stepExecListener)
            .build();

    //create step
    Step mainStep = stepBuilder.get("run")
            .<ItemProcessing, ItemProcessing>chunk(5)
            .faultTolerant()
            .skip(IOException.class).skip(SocketTimeoutException.class)//skip IOException here
            .skipLimit(2000)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .listener(stepExecListener)
            .listener(new ItemProcessorListener()) //add process listener
            .listener(skipExceptionListener) //add skip exception listner
            .build();


    //create job
    Job job = jobBuilder.get(batchReqExec.getId())
            .start(mainStep)
            .next(resultFileStep)
            .build();

    JobParametersBuilder jobParamBuilder = new JobParametersBuilder();

    //run job
    JobExecution execution = jobLauncher.run(job, jobParamBuilder.toJobParameters());

读取如下数据:

private FlatFileItemReader<ItemProcessing> read(BatchRequestsRun batchReqRun) throws Exception {
    //prepare input file
    File localInputDir = new File(localStoragePath+"/batch-requests");
    if(!localInputDir.exists() || localInputDir.isFile()) {
        localInputDir.mkdir();
    }
    File localFile = new File(localInputDir, batchReqRun.getFileRef()+"-"+batchReqRun.getFile());
    if(!localFile.exists()) {
        httpClientService.getFileFromStorage(batchReqRun.getFileRef(), localFile);          
    }

    FlatFileItemReader<ItemProcessing> reader = new FlatFileItemReader<ItemProcessing>();
    reader.setResource(new FileSystemResource(localFile));

    reader.setLineMapper(new DefaultLineMapper<ItemProcessing>() {
        {
            setLineTokenizer(new DelimitedLineTokenizer());
            setFieldSetMapper(new FieldSetMapper<ItemProcessing>() {
                @Override
                public ItemProcessing mapFieldSet(FieldSet fieldSet) throws BindException {
                    ItemProcessing item = new ItemProcessing();
                    item.setFieldSet(fieldSet);
                    return item;
                }
            });
        }
    });

    return reader;
}

过程数据如下:

private ItemProcessor<ItemProcessing, ItemProcessing> process(BatchRequestsDef batchReqDef) {
    ItemProcessor<ItemProcessing, ItemProcessing> processor = (input) -> {
        VelocityContext context = new VelocityContext();
        //.....ingore velocity code 

        String responseBody = null;
        //send http invoking
        input.setResponseBody(httpClientService.process(batchReqDef, input));
        responseBody = input.getResponseBody();
        logger.info(responseBody);

        // using Groovy to parse response
        Binding binding = new Binding();
        try {
            binding.setVariable("response", responseBody);
            GroovyShell shell = new GroovyShell(binding);
            Object result = shell.evaluate(batchReqDef.getConfig().getResponseHandler());
            input.setResult(result.toString());
        } catch(Exception e) {
            logger.error("parse groovy script found exception:{},{}",e.getMessage(),e);
        }

        return input;
    }; 

    return processor;
}

在这里忽略写文件方法 .

谁能帮我用多线程实现流程方法?我猜 Spring 季批量读取一行数据,然后处理一行(执行ItemProcessor直接调用远程服务)正如我们所知,读取一行数据的速度远远超过调用http服务一次 . 所以我想用单线程将所有数据(或某些部分数据)读入内存(List),然后在步骤2中使用多线程调用远程调用 .

(使用java线程池非常容易,但我不知道使用spring批处理实现)

请给我一些代码,非常感谢!