我正在研究一个现有的Spring Integration代码 .
下面的代码被击中了 . 从sql查询中提取大约20,000条记录并发送到拆分器 .
码:
<int-jdbc:outbound-gateway query="..." />
<int:splitter input-channel="..." output-channel="queueChannel"/>
<int:channel id="queueChannel">
<int:queue capacity="25" />
</int:channel>
<int:service-activator ref="..."
input-channel="queueChannel" output-channel="..." method="xxx">
<int:poller max-messages-per-poll="25" fixed-delay="100"
receive-timeout="30000" task-executor="reqExecutor"/>
</int:service-activator>
<task:executor id="reqExecutor" pool-size="25" queue-capacity="5" rejection-policy="CALLER_RUNS" />
在互联网上进行一些搜索之后,这是我对代码的理解 . 请纠正我,如果我错了:
拆分器输出通道是容量为25的队列通道,这意味着它将从查询中获取25批记录 .
现在,服务激活器中编写的代码将每100毫秒轮询一次,并从队列通道中获取25条消息 . Service Activator在具有任务执行程序的多线程环境中运行 .
任务执行程序的池大小为25.这是一次可以运行的最大线程数 . 并且队列容量为5.如果所有线程都忙,则Executor会将它们放入队列中 . 如果队列容量达到5,则执行程序将拒绝该任务 .
拒绝政策是CALLER_RUNS . 执行人将在拒绝时使用主要流程 .
应用程序性能可能会受到CALLER_RUNS拒绝策略的影响 . 但其他政策放弃或中止该线程 . 因此,改变拒绝政策不是解决方案 .
我应该更改任务执行程序的池大小或队列容量来解决问题 . 有没有偏好的 Value . 它的影响是什么?
或者,我应该改变轮询器的固定延迟?
编辑1:
在日志中,下面的行重复出现并且过程没有完成:
Received no Message during the poll, returning 'false'
编辑2:
我的问题是否与以下链接中提到的问题有关:
另外,我不清楚poller的receive-timeout属性 .
2 回答
receive-timeout
是轮询器(reqExecutor
)线程在队列通道中等待消息到达的时间 . 如果在没有消息到达后它到期,则该线程将返回到池中 .如果消息到达,则在线程上处理它,并且线程返回池中 .
如果你不能从线程转储中解决它,把它贴在某个地方(不是这里 - 可能太大) - pastebin或github gist .
在进行一些代码优化后解决了这个问题 . 代码中存在两个主要问题:
哈希 Map 的密钥的哈希码和等于方法未被覆盖 . hashmap用于缓存查询,但在覆盖的方法中,它无法正常工作 .
Spring Integration代码中有一个插入查询,它大约插入20,000次记录 . 既然,AFAIK我们不能在Spring Integration中进行批量更新 . 因此,我将插入查询提取到java类中并进行批量更新 .
但是,我想知道,为什么线程转储和内存转储没有给出任何暗示呢?
谢谢Geek和Gary帮助我 .