首页 文章

并行执行和聚合器锁定

提问于
浏览
0

我使用spring集成来处理文件的一些目录,每个文件都通过“流程” . 我想以这样的方式设置文件的整体处理,即文件轮询器监视目录(或多个)以查找新文件 . 一旦新文件被拾取轮询,它应该被传递到流程中的后续组件,在该流程中,在不保持轮询过程的情况下处理该新文件 . 处理的第二个方面是所有新文件在聚合器基于例如聚合器聚合之前经过几个步骤 . 文件数(标准在目录中更改) . 一旦累积了足够的文件,它们就会从聚合中释放出来,然后在聚合器之后的一些耗时步骤中进行处理 . 所以整个过程看起来像这样

文件-A拿起来了
file-A从poller传递到step1
file-A从step1传递到聚合器
文件-B拿起来了
file-B从poller传递到step1
file-B从step1传递给聚合器
file-C拾起了
file-C从poller传递给step1
file-C从step1传递给聚合器

文件A,B和C从聚合器中释放
文件A,B和C由最后一步处理

总的来说有两个要求

  • 在单独的线程中处理每个文件,以最大化当前正在处理的文件的数量
    从聚合器发布的
  • 文件属于相关ID,我们只希望通过最后一步处理使用相同相关ID的一组消息

我如何尝试满足这两个要求是为了#1我只是在文件轮询器之后使用一个队列,其中新文件被删除而步骤a从队列中拾取文件 . 这将分离轮询过程,并且想法是在步骤中使用线程 Actuator - 服务激活器来处理单个线程中的每个文件

通过在与聚合器相同的线程中的聚合器之后简单地执行最后步骤来自动处理第二个要求 . 由于聚合器根据相关id放置一个锁,如果另一个组被释放用于相同的相关ID,它只是在处理相同组的前一个实例之前等待 .

我遇到的问题是#1没有实现,因为服务激活器在尝试为第二个文件创建另一个线程之前一直等到线程完成结束 . 这有点没用,因为这种方式在服务激活器上有一个线程 Actuator 是没用的 . 它似乎只在完成第一个线程后创建第二个therad . 因此,为了解决这个问题,我用调度程序通道替换了排队通道,并在调度程序通道上找到了执行程序 . 现在,每个文件都在一个单独的线程中处理,同时正在处理多个文件 .
现在第二部分,由于聚合器之后的组件是耗时的,我想从第一部分断开该进程,所以我在聚合器之后放置了一个排队的通道,但是现在使用这种方法,我以前用聚合器获得的锁定行为已经消失,因为线程在最终耗时步骤之前,已从排队通道中释放来自聚合器的消息/完成 .

对整个过程有任何想法 . 如何在并行运行时完成我的两个要求 .

谢谢

1 回答

  • 0

    你的问题根本不清楚 . 是的,对于在锁定下运行的下游流,它必须在最终的“释放”线程上运行(线程处理完成该组的最后一个入站消息),您不能使用聚合器下游的队列或 Actuator 通道 .

    但是,这对执行程序通道中的线程没有影响;其他组(具有不同的相关性)将处理 . 但是,如果您对“下一个”组使用相同的相关ID,则其线程将被阻止 .

    如果您说要组装下一个组(具有相同的correlationid)而第一个组正在处理下游,则必须使用其他一些机制来强制执行下游单线程 - 例如具有单线程的执行程序通道执行者,或使用另一个锁注册表 .

相关问题