现在我以前的使用场景如下:
-
使用FlatFileItemReader逐行读取带有.txt文件的输入流
-
使用ItemProcessor进程每行数据调用http的远程服务
-
使用FlatFileItemWriter将每个请求的结果写入文件
我想使用ItemProcessor in step 2 处理带有 multi thread 的远程调用
主要流程代码如下(带 spring 启动):
//read data
FlatFileItemReader<ItemProcessing> reader = read(batchReqRun);
//process data
ItemProcessor<ItemProcessing, ItemProcessing> processor = process(batchReqDef);
//write data
File localOutputDir = new File(localStoragePath+"/batch-results");
File localOutputFile = new File(localOutputDir, batchReqExec.getDestFile());
FlatFileItemWriter<ItemProcessing> writer = write(localOutputDir,localOutputFile);
StepExecutionListener stepExecListener = new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
logger.info("Job {} step start {}",stepExecution.getJobExecutionId(), stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.info("Job {} step end {}",stepExecution.getJobExecutionId(), stepExecution.getStepName());
//.......ingore some code
return finalStatus;
}
};
Tasklet resultFileTasklet = new BatchFileResultTasklet(localOutputFile, httpClientService);
TaskletStep resultFileStep = stepBuilder.get("result")
.tasklet(resultFileTasklet)
.listener(stepExecListener)
.build();
//create step
Step mainStep = stepBuilder.get("run")
.<ItemProcessing, ItemProcessing>chunk(5)
.faultTolerant()
.skip(IOException.class).skip(SocketTimeoutException.class)//skip IOException here
.skipLimit(2000)
.reader(reader)
.processor(processor)
.writer(writer)
.listener(stepExecListener)
.listener(new ItemProcessorListener()) //add process listener
.listener(skipExceptionListener) //add skip exception listner
.build();
//create job
Job job = jobBuilder.get(batchReqExec.getId())
.start(mainStep)
.next(resultFileStep)
.build();
JobParametersBuilder jobParamBuilder = new JobParametersBuilder();
//run job
JobExecution execution = jobLauncher.run(job, jobParamBuilder.toJobParameters());
读取如下数据:
private FlatFileItemReader<ItemProcessing> read(BatchRequestsRun batchReqRun) throws Exception {
//prepare input file
File localInputDir = new File(localStoragePath+"/batch-requests");
if(!localInputDir.exists() || localInputDir.isFile()) {
localInputDir.mkdir();
}
File localFile = new File(localInputDir, batchReqRun.getFileRef()+"-"+batchReqRun.getFile());
if(!localFile.exists()) {
httpClientService.getFileFromStorage(batchReqRun.getFileRef(), localFile);
}
FlatFileItemReader<ItemProcessing> reader = new FlatFileItemReader<ItemProcessing>();
reader.setResource(new FileSystemResource(localFile));
reader.setLineMapper(new DefaultLineMapper<ItemProcessing>() {
{
setLineTokenizer(new DelimitedLineTokenizer());
setFieldSetMapper(new FieldSetMapper<ItemProcessing>() {
@Override
public ItemProcessing mapFieldSet(FieldSet fieldSet) throws BindException {
ItemProcessing item = new ItemProcessing();
item.setFieldSet(fieldSet);
return item;
}
});
}
});
return reader;
}
过程数据如下:
private ItemProcessor<ItemProcessing, ItemProcessing> process(BatchRequestsDef batchReqDef) {
ItemProcessor<ItemProcessing, ItemProcessing> processor = (input) -> {
VelocityContext context = new VelocityContext();
//.....ingore velocity code
String responseBody = null;
//send http invoking
input.setResponseBody(httpClientService.process(batchReqDef, input));
responseBody = input.getResponseBody();
logger.info(responseBody);
// using Groovy to parse response
Binding binding = new Binding();
try {
binding.setVariable("response", responseBody);
GroovyShell shell = new GroovyShell(binding);
Object result = shell.evaluate(batchReqDef.getConfig().getResponseHandler());
input.setResult(result.toString());
} catch(Exception e) {
logger.error("parse groovy script found exception:{},{}",e.getMessage(),e);
}
return input;
};
return processor;
}
在这里忽略写文件方法 .
谁能帮我用多线程实现流程方法?我猜 Spring 季批量读取一行数据,然后处理一行(执行ItemProcessor直接调用远程服务)正如我们所知,读取一行数据的速度远远超过调用http服务一次 . 所以我想用单线程将所有数据(或某些部分数据)读入内存(List),然后在步骤2中使用多线程调用远程调用 .
(使用java线程池非常容易,但我不知道使用spring批处理实现)
请给我一些代码,非常感谢!