也许这是一个反复出现的问题,但我需要根据我的上下文进行一些自定义 .
我正在使用Spring Batch 3.0.1.RELEASE
我有一些简单的工作,有一些步骤 . 一步是像这样的块:
<tasklet transaction-manager="myTransactionManager">
<batch:chunk reader="myReader" processor="myProcessor" writer="myWriter" commit-interval="${commit.interval}">
</batch:chunk>
<bean id="myProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
<list>
<bean class="...MyFirstProcessor">
</bean>
<bean class="...MySecondProcessor">
</bean>
</list>
</property>
-
Reader :JdbcCursorItemReader
-
Processor :CompositeProcessor与我的代表
-
Writer :CompositeWriter与我的代表
通过这种配置,我的工作完美无缺 .
现在,我想将其转换为多线程作业 . 在documentation to basic multi-thread jobs之后,我在tasklet中包含了一个SympleAsyncTaskExecutor,但它失败了 .
我已经readed JdbcCursorItemReader与多线程执行无法正常工作(是不是?) . 我已经将读者更改为JdbcPagingItemReader,这已成为一场噩梦:作业不会失败,编写过程没问题,但数据在线程之间混合,客户数据不正确且连贯(客户有服务,添加,来自其他人) .
那么,为什么会这样呢?我怎样才能更改为多线程作业?
-
复合处理器和编写器是否适用于多线程?
-
如何制作自定义线程安全的复合处理器?
-
也许它可能是JDBC读者:对于多线程,是否有任何线程安全的JDBC读取器?
我非常关心和困惑,所以任何帮助都会非常感激 . 非常感谢 .
[EDIT - SOLVED]
好吧,我的问题的正确和合适的解决方案是从开始设计多线程和线程安全执行的作业 . 习惯性地首先使用单线程步执行来理解和了解Spring Batch概念;但如果你认为你要离开这个阶段,那么必须提出诸如不可变对象,线程安全列表, Map 等的考虑因素 .
目前我的问题的当前状态是我后面描述的下一个 . 在测试了马丁的建议并考虑到迈克尔的指导方针之后,我终于解决了我的问题 . 接下来的步骤不是很好的做法,但我无法从开始重建我的工作:
-
将itemState更改为JdbcPagingItemReader,并将setState更改为false .
-
通过CopyOnWriteArrayList更改列表 .
-
通过ConcurrentHashMap更改HashMap .
-
在每个委托处理器中,通过传递上下文(实现ApplicationContextAware)并获取bean的唯一实例(将每个注入的bean配置为scope = "prototype"),获取每个bean属性的新实例(幸运的是,只有一个注入的bean) .
所以,如果委托的bean是:
<bean class="...MyProcessor">
<property name="otherBean" ref="otherBeanID" />
改成:
<bean class="...MyProcessor">
<property name="otherBean" value="otherBeanID" />
并且,在MyProcessor中,从上下文获取otherBeanID的单个实例;必须使用scope = "protoype"配置otherBeanID .
正如我之前所说,它们并不是好的风格,但它是我最好的选择,我可以断言每个线程都有自己的不同的项目实例和其他bean实例 .
它证明某些类没有很好地设计用于正确的多线程执行 .
马丁,迈克尔,感谢您的支持 .
我希望它对任何人都有帮助 .
1 回答
您在问题中提出了很多问题(将来,请将此类问题分解为多个更具体的问题) . 但是,逐项:
Is JdbcCursorItemReader thread-safe?
作为documentation states,它不是 . 原因是
JdbcCursorItemReader
包装了一个非线程安全的ResultSet
.Are the composite processor and writer right for multithread?
Spring Batch提供的
CompositeItemProcessor
被认为是线程安全的,只要委托ItemProcessor
实现也是线程安全的 . 您不提供与您的实现或其配置相关的代码,因此我无法验证其线程安全性 . 但是,鉴于您所描述的症状,我的预感是您的代码中存在某种形式的线程安全问题 .您也没有确定正在使用的
ItemWriter
实现或其配置,因此也可能存在与线程相关的问题 .如果您使用有关实施和配置的更多信息更新问题,我们可以提供更多信息 .
How could I make a custom thread-safe composite processor?
实施任何_1392948时,有两件事需要考虑:
Make it thread safe: 遵循基本的线程安全规则(阅读关于该主题的圣经的书籍Java Concurrency In Practice)将允许您通过添加任务 Actuator 来扩展组件 .
Make it idempotent: 在跳过/重试处理期间,可能会重新处理项目 . 通过使
ItemProcessor
实现具有幂等性,这将防止副作用这通过处理器多次跳闸 .Maybe could it be the JDBC reader: Is there any thread-safe JDBC reader for multi-thread?
如您所知,
JdbcPaginingItemReader
是线程安全的,并在documentation中注明 . 当使用多个线程时,每个块都在其中执行's own thread. If you' ve配置页面大小以匹配commit-interval,这意味着每个页面在同一个线程中处理 .Other options for scaling a single step
当您沿着实施单个多线程步骤的道路前进时,可能会有更好的选择 . Spring Batch提供5种核心扩展选项:
多线程步骤 - 正如您现在正在尝试的那样 .
并行步骤 - 使用Spring批处理's split functionality you can execute multiple steps in parallel. Given that you'在同一步骤中使用复合
ItemProcessor
和复合ItemWriter
s,这可能需要探索(将当前复合方案分解为多个并行步骤) .Async
ItemProcessor
/ItemWriters
- 此选项允许您在不同的线程中执行处理器逻辑 . 处理器将线程关闭并返回Future
到AsyncItemWriter
,它将阻塞,直到Future
返回写入 .分区 - 这是将数据划分为称为分区的块,这些块由子步骤并行处理 . 每个分区都由一个实际的独立步骤处理,因此使用步骤范围的组件可以防止线程安全问题(每个步骤都获得它自己的实例) . 分区处理可以通过线程本地执行,也可以跨多个JVM远程执行 .
Remote Chunking - 此选项将处理器逻辑存储到其他JVM进程 . 它真的应该只用于
ItemProcessor
逻辑是流程中的瓶颈 .您可以在这里阅读Spring Batch文档中的所有这些选项:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html
线程安全是一个复杂的问题 . 只需将多个线程添加到曾经在单线程环境中工作的代码,通常就会发现代码中的问题 .