首页 文章

Spring Batch和JobInstanceAlreadyCompleteException:

提问于
浏览
1

我有一个Spring Batch作业,它通过SFTP从远程Linux服务器检索文件 . 远程服务器上的目录是一个包含七天文件(~400个文件)的存档 . 文件的大小相对较小 .

Spring Batch知道哪些文件已被处理 .

当我启动应用程序时 . 第一次,Spring Batch tasklet检索文件,Spring Batch为它已经处理的每个文件生成一个例外:

例如 .

由以下原因引起:org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException:作业实例已经存在且对parameters = 完成 .

这导致处理的巨大延迟 .

在第一次之后,在随后的sftp检索文件时,没有异常,因此没有延迟(也许Spring Batch已生成已经处理的文件的内部哈希列表) .

  • 在Transformer类中,我应该检查文件是否存在于本地,并且仅对尚未处理的新文件调用JobLaunchRequest()?

/ **

  • 将BAI文件转换为Spring Batch作业启动请求* / public class FileMessageToJobRequestTransformer {public static final Logger LOGGER = LoggerFactory.getLogger(FileMessageToJobRequestTransformer.class);私人工作;

private String fileParameterName;

public void setJob(Job job)

public void setFileParameterName(String fileParameterName){LOGGER.debug(“file parameter name:{}”,fileParameterName); this.fileParameterName = fileParameterName; }

@Transformer public JobLaunchRequest transform(Message message){LOGGER.debug(“File message:{}”,message); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();

jobParametersBuilder.addString(fileParameterName,
        "file://" + message.getPayload().getAbsolutePath());

LOGGER.debug("Job params: {}", jobParametersBuilder.toJobParameters());

return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());

}}

  • 有没有办法可以捕获异常?

JobInstanceAlreadyCompleteException

  • 我应该在SftpBaiParserJobBridge-context.hml中设置retry-limit = "1"和skip-limit = "1"吗?

<!-- When getting a new BAI file, transform into spring batch job request --> <int:transformer id="fileMessageToJobRequestTransformer" input-channel="inboundFileChannel" output-channel="outboundJobRequestChannel" method="transform"> <bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer"> <property name="job" ref="baiParseJob"/> <property name="fileParameterName" value="input.file.url"/> </bean> <int:poller fixed-rate="10000"/> </int:transformer>

附加addDate()允许多次处理文件 . 现在数据库中有重复数据 .

jobParametersBuilder.addString(fileParameterName,
                "file://" + message.getPayload().getAbsolutePath())).addDate("rundate", new Date()).toJobParameters();

enter image description here
谢谢!

  • 我是否可以通过JobExplorer界面从'@Transformer public JobLaunchRequest transform(Message message)'查询存储库来确定文件是否已经处理,如果文件已被处理,则返回null?

例如 .

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}
  • 如何捕获异常?

由以下原因引起:org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException:作业实例已存在并且已完成参数= {input.file.url = file:/// home / dlaxer / dfc-bank-integration / mbi-应用程序/白/下载/ BAI_Intraday160302070054471.txt} . 如果要再次运行此作业,请更改参数 . at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:30)的java.lang.reflect.Method.invoke(Method.java:498)的sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) )在org.springframework.transaction.TraringactionTart的工作室org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) org.springfra上的org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281)中的.proceedWithInvocation(TransactionInterceptor.java:99) org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)中的mework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean $ 1 .invoke(AbstractJobRepositoryFactoryBean.java:172)位于org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)的org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)at com . sun.proxy . $ Proxy113.createJobExecution(未知来源)org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun . 在org.spring的java.lang.reflect.Method.invoke(Method.java:498)的sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)中的reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) framework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302)at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)atOrg.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)位于org.springframework.aop的org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration $ PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) . 位于org.springframework的com.sun.proxy . $ Proxy114.run(未知来源)的org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)上的framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) .batch.integration.launch.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:50)org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:76)... 35更多

2 回答

  • 1

    我看到你在工作启动器中只传递一个参数作为“文件名” . 一旦启动了作业启动器,它将从存储库BATCH_JOB_EXECUTION表中查询并检查上次处理的作业的状态 . 在当前作业执行输入参数与先前执行的作业相同,批处理状态和退出代码=已完成,然后您将获得JobInstanceAlreadyCompleteException . 您应该尝试在每次执行时始终传递唯一的Parmaeters . 只需将当前时间作为参数传递并尝试

    JobParameters jobparam =   new JobParametersBuilder().addString(fileParameterName, "file://" + message.getPayload().getAbsolutePath())
                            .addDate("rundate", new Date()).toJobParameters();
    
    JobExecution execution = jobLauncher.run(job, jobparam);
             catch (Exception e) {
                 if ( e instanceof JobInstanceAlreadyCompleteException){
                     System.out.println("Raj*************");
                 }
    
    • 2是的,你可以像这样处理

    catch(例外e){if(e instanceof JobInstanceAlreadyCompleteException){System.out.println(“需要处理*************”); }

  • 1

    3 - 我应该设置retry-limit = "1"和skip-limit = "1" .
    回答它取决于您的要求 . 你要忽略它的异常和次数意味着=跳过限制,并且在吸盘过程中想要重复同一步骤,特殊异常意味着重试 . 我们可以用例子更好地理解 . 如果我想读取一些平面文件,并且文件可能有错误的输入记录,这可能导致flatFileException我想忽略它并顺利处理我的文件然后我的配置看起来像这样 . 我可以在作业执行期间跳过最多20条记录,如果批次收到异常,则可以跳过任何记录,然后它应该尝试至少2次 .

    <chunk reader="flatFileItemReader" writer="itemWriter"
                 commit-interval="1" skip-limit="20" retry-limit="2">
             <skippable-exception-classes>
                <include class="org.springframework.batch.item.file.FlatFileParseException"/>
             </skippable-exception-classes>
    

相关问题