Spring批处理使用或如何在作业中启动作业

TL;DR :如何使用Spring Batch Job创建Spring Batch Jobs?交易边界似乎是问题所在 . 这似乎是一个经典的问题,但在这里它再次出现:

我有以下用例:我需要轮询一个FTP服务器并将找到的XML文件存储为数据库中的blob . XML有0 ... N个感兴趣的条目我需要发送到外部Web服务并存储响应 . 响应可以是不可重试的或可重试的,我需要存储每个请求及其响应以用于审计目的 .

域/ JPA模型如下:批处理(包含XML blob)包含0-N BatchRow对象 . BatchRow包含要发送到Web服务的数据,它还包含1 ... N个BatchRowHistory对象,其中包含有关Web服务调用的状态信息 .

我被要求使用Spring Batch实现这一点(从这种集成的情况来看,Spring Integration可能是其他可能性) . 现在我一直在努力采用不同的方法,我发现这个任务要复杂得多,因此恕我直言 .

我已将任务拆分为以下工作:

Job1

  • 步骤11:获取文件并以blob形式存储到数据库 .

  • 步骤12:将XML拆分为条目并将这些条目存储到db .

  • Step13 :创建Job2并为Step12中存储的每个条目启动它 . Mark Job2在域模型数据库中为条目创建了标记 .

Job2

  • 步骤21:为每个条目调用Web服务并将结果存储到db . 重试并跳过逻辑停留在这里 . Job2类型可能需要手动重启等 .

这种结构背后的逻辑是Job1定期运行(每分钟一次) . 只要存在这些作业,Job2就会运行,并且它们已经成功或者它们的重试限制已经达到并且它们已经失败 . 域模型基本上只存储结果,Spring Batch负责运行show . 手动重新启动等可以通过Spring Batch Admin处理(至少我希望如此) . 此外,Job2在JobParameters映射中具有BatchRow的id,因此可以在Spring Batch Admin中查看它 .

Question 1 :这种工作结构有意义吗?即为db中的每一行创建新的Spring Batch Jobs,它似乎打败了目的并在某种程度上重新发明轮子?

Question 2 :如何在Step13中创建这些Job2条目?

我遇到了事务和JobRepository的第一个问题,但是成功启动了一些具有以下设置的作业:

<batch:step id="Step13" parent="stepParent">
 <batch:tasklet>
   <batch:transaction-attributes propagation="NEVER"/>
   <batch:chunk reader="rowsWithoutJobReader" processor="batchJobCreator" writer="itemWriter"
                commit-interval="10" />
 </batch:tasklet>
</batch:step>

<bean id="stepParent" class="org.springframework.batch.core.step.item.FaultTolerantStepFactoryBean" abstract="true"/>

请注意,commit-interval =“10”表示当前最多可以创建10个作业,这就是它...因为batchJobCreator调用了JobLauncher.run方法并且它可以游动但是itemWriter无法将BatchRows写回数据库并提供更新的信息( boolean jobCreated标志切换为) . 明显的原因是在transaction-attributes中传播.NEVER,但没有它我无法用jobLauncher创建作业 .

由于更新未传递到数据库,因此我再次获得相同的BatchRows,它们使日志混乱:

org.springframework.batch.retry.RetryException: Non-skippable exception in recoverer while processing; nested exception is org.springframework.batch.core.repository.JobExecutionAlreadyRunningException: A job execution for this job is already running: JobInstance: id=1, version=0, JobParameters=[{batchRowId=71}], Job=[foo.bar]
        at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$2.recover(FaultTolerantChunkProcessor.java:278)
        at org.springframework.batch.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:420)
        at org.springframework.batch.retry.support.RetryTemplate.doExecute(RetryTemplate.java:289)
        at org.springframework.batch.retry.support.RetryTemplate.execute(RetryTemplate.java:187)
        at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:215)
        at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.transform(FaultTolerantChunkProcessor.java:287)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:190)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:74)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
        at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264)
        at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
        at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
        at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
        at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
        at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195)
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
        at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:293)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
        at java.lang.Thread.run(Thread.java:680)

这意味着已经在Spring Batch中创建了作业,并且它尝试在以后执行Step13时再次创建这些文件 . 我可以在Job2 / Step21中规避将jobCreated标志设置为true,但这对我来说感觉很糟糕 .

Question 3 :我有更多的域对象驱动方法;我有Spring Batch Jobs使用相当精细的JPQL查询和JPAItemReaders扫描域表 . 这种方法的问题在于它不使用Spring Batch的更精细的功能 . 历史和重试逻辑是问题所在 . 我需要直接将重试逻辑编码到JPQL查询中(例如,如果BatchRow具有多于3个BatchRowHistory元素,则它已经失败并需要手动重新检查) . 我应该咬紧牙关并继续这种方法而不是尝试为每个Web服务调用创建单独的Spring Batch Job吗?

软件信息,如果需要:Spring Batch 2.1.9,Hibernate 4.1.2,Spring 3.1.2,Java 6 .

提前谢谢你,感谢抱歉,蒂莫

Edit 1: 我认为我需要产生新工作的原因是:

  • 循环读取器返回null或抛出异常

  • 交易开始

  • reader - 处理器 - 整个N行的编写器循环

  • 批次大小为N的交易结束

每个失败的条目都是问题;我想手动重启执行(作业是唯一可以在Spring Batch Admin中重启的,对吗?),对于批处理中的每一行,以便我可以使用Spring Batch Admin查看失败的作业(其作业参数包含来自域db的行ID)并重新启动那些等 . 如何在不生成作业并将历史记录存储到域db的情况下完成此类行为?

回答(1)

2 years ago

好吧,我讨厌回答问题......但我需要知道些什么?

1)如果您的输入文件是XML,为什么不在它们上使用StaxEventItemReader并只是在步骤1中保留您的输入?

2)从一个步骤开始第二个工作!我甚至不知道它是否应该有效......但IMO ......它闻起来;-)

为什么不定义另一个使用JdbcCursorItemReader读取条目并在ItemProcessor中调用Web服务的步骤,然后将结果写入数据库?

也许我不明白你为每次调用Web服务创建不同的工作的要求!

我做了类似于您的用例的事情,并使用此方案完成:

作业1:步骤1:读取xml,处理pojo-> domain obj,在DB中写入域obj

作业2:步骤1:从db读取obj,process =调用WS,在DB中写入响应

这很简单,效果很好(包括可重启和跳过功能)

希望它会有所帮助

问候