我使用Spring Batch的处理使用MultiResourcePartitioner和所有的itemreader和作家多个文件都在一步一步scope.Each运行单个文件,并在1000区间提交给数据库时,有电流处理过程中的任何错误,以前所有的需求提交要滚动支持,步骤将失败 . 因此,文件内容不会添加到数据库中 .
这是其中最好的方法:
-
将交易传播用作NESTED .
-
使用Integer.MAXVALUE在块中设置提交间隔,这不起作用,因为文件具有大项并且堆空间失败 .
-
在步骤级别进行交易的任何其他方式 .
我有如下所示的示例xml文件:
<bean id="filepartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="classpath:${filepath}" />
</bean>
<bean id="fileItemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">
<property name="resource" value="#{stepExecutionContext[fileName]}" />
</bean>
<step id="step1" xmlns="http://www.springframework.org/schema/batch">
<tasklet transaction-manager="ratransactionManager" >
<chunk writer="jdbcItenWriter" reader="fileItemReader" processor="itemProcessor" commit-interval="800" retry-limit="3">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
<listeners>
<listener ref="customStepExecutionListener">
</listener>
</listeners>
</tasklet>
<fail on="FAILED"/>
</step>
更新:
似乎主表(直接插入发生的地方)由其他表和物化视图引用 . 如果我删除此表中的数据以使用已处理的列指示符删除过时记录,则使用MV假脱机的数据将显示旧数据 . 我认为我的要求需要临时表 .
为此要求实现登台数据表
-
创建另一个并行步骤以轮询数据库并写入处理列值为Y的数据 .
-
使用步骤侦听器(afterStep方法)在每次成功完成文件结束时传输数据 .
或任何其他建议 .
3 回答
总的来说,我同意@MichaelLange的方法 . 但也许单独的表太多了......您可以在导入表中添加其他列
completed
,如果设置为"false"则该记录属于正在处理的文件(或处理失败) . 在've processed the file you issue a simple update for this table (should not fail as you don' t对此列有任何约束之后):update import_table set completed = true where file_name = "file001_chunk1.txt"
在处理文件之前,您应该删除“陈旧”记录:
delete from import_table where file_name = "file001_chunk1.txt"
这个解决方案比嵌套事务更快更容易实现 . 也许使用这种方法,您将面临表锁,但通过适当选择隔离级别,可以将其最小化 . (可选)您可能希望在此表上创建一个视图以过滤掉未完成的记录(在
completed
列上启用索引):create view import_view as select a, b, c from import_table where completed = true
一般来说,我认为嵌套事务在这种情况下是不可能的,因为块可以在并行线程中处理,每个线程都拥有它自己的事务上下文 . 即使您以某种方式设法在“顶部”作业线程中创建“主要事务”,事务管理器也无法在新线程中启动嵌套事务 .
另一种方法是继续使用“临时表” . 导入过程应该做的是创建导入表并根据例如命名来命名它们 . 日期:
以及加入所有这些表格的“超级视角”:
导入成功后,应重新创建“超级视图” .
使用这种方法,您将难以使用导入表的外键 .
另一种方法是使用单独的DB进行导入,然后将导入的数据从导入DB提供给main(例如传输二进制数据) .
你不能尝试补偿策略吗?
一些例子
为数据使用临时表或额外表,只有在作业成功时才将数据移动到业务表
使用conditional flow在出现问题时调用"deleteAllWrittenItems"步骤
在oVirt开源项目中,Mike Kolesnik,Eli Mesika和我自己实施了一个完整的补偿机制 .
您可以克隆项目并查看与CompensationContext相关的类 .
我在最后几天尝试了 spring 批次,这是我生命中的第一次,对于批处理操作看起来像是全部相同的CRUD操作类型 - 例如,批量插入,有辅助列可以救命 .
我想要了解的是,我们是否可以以某种方式拦截作业ID,并将其存储在包含插入数据的表中(IE - 通过拥有一列job_id),或者例如存储一对job_id和entity_id单独的表,然后在作业失败的情况下的补偿将擦除每个作业的所有条目 .