首页 文章

Spring Integration - Task Executor卡住了

提问于
浏览
1

我正在研究一个现有的Spring Integration代码 .

下面的代码被击中了 . 从sql查询中提取大约20,000条记录并发送到拆分器 .

码:

<int-jdbc:outbound-gateway query="..." />

<int:splitter input-channel="..." output-channel="queueChannel"/>

<int:channel id="queueChannel">
    <int:queue capacity="25" />
</int:channel>

<int:service-activator ref="..."
    input-channel="queueChannel" output-channel="..." method="xxx">
    <int:poller max-messages-per-poll="25" fixed-delay="100"
    receive-timeout="30000"  task-executor="reqExecutor"/>
</int:service-activator>

<task:executor id="reqExecutor" pool-size="25" queue-capacity="5" rejection-policy="CALLER_RUNS" />

在互联网上进行一些搜索之后,这是我对代码的理解 . 请纠正我,如果我错了:

拆分器输出通道是容量为25的队列通道,这意味着它将从查询中获取25批记录 .

现在,服务激活器中编写的代码将每100毫秒轮询一次,并从队列通道中获取25条消息 . Service Activator在具有任务执行程序的多线程环境中运行 .

任务执行程序的池大小为25.这是一次可以运行的最大线程数 . 并且队列容量为5.如果所有线程都忙,则Executor会将它们放入队列中 . 如果队列容量达到5,则执行程序将拒绝该任务 .

拒绝政策是CALLER_RUNS . 执行人将在拒绝时使用主要流程 .

应用程序性能可能会受到CALLER_RUNS拒绝策略的影响 . 但其他政策放弃或中止该线程 . 因此,改变拒绝政策不是解决方案 .

我应该更改任务执行程序的池大小或队列容量来解决问题 . 有没有偏好的 Value . 它的影响是什么?

或者,我应该改变轮询器的固定延迟?

编辑1:

在日志中,下面的行重复出现并且过程没有完成:

Received no Message during the poll, returning 'false'

编辑2:

我的问题是否与以下链接中提到的问题有关:

http://docs.spring.io/autorepo/docs/spring-integration/3.0.x/reference/html/messaging-endpoints-chapter.html#async-polling

另外,我不清楚poller的receive-timeout属性 .

2 回答

  • 1

    receive-timeout 是轮询器( reqExecutor )线程在队列通道中等待消息到达的时间 . 如果在没有消息到达后它到期,则该线程将返回到池中 .

    如果消息到达,则在线程上处理它,并且线程返回池中 .

    如果你不能从线程转储中解决它,把它贴在某个地方(不是这里 - 可能太大) - pastebin或github gist .

  • 2

    在进行一些代码优化后解决了这个问题 . 代码中存在两个主要问题:

    • 哈希 Map 的密钥的哈希码和等于方法未被覆盖 . hashmap用于缓存查询,但在覆盖的方法中,它无法正常工作 .

    • Spring Integration代码中有一个插入查询,它大约插入20,000次记录 . 既然,AFAIK我们不能在Spring Integration中进行批量更新 . 因此,我将插入查询提取到java类中并进行批量更新 .

    但是,我想知道,为什么线程转储和内存转储没有给出任何暗示呢?

    谢谢Geek和Gary帮助我 .

相关问题