我使用redis作为队列(使用spring queue-in / outbound-channel-adapter)来分配任务(消息进入队列等)
由于吞吐量非常高,我们观察到,尽管消息被发送到redis队列,但很多消息都丢失了,并且在入站(头路由器)之后没有消息到达组件
通道配置附在下面;关键是我们虽然问题是在入站addapter之后的这个头路由器中,但是无法管理从队列读取的消息的速率,所以它们丢失了 .
我们在入站适配器和此组件(即标头路由器)之间使用了一个中间元素,并添加了一个队列来解决此问题 .
这很好,但实际上我们并不完全理解解决方案,如果这是正确的解决方案 .
关于这种配置的专家观点和意见将是好的!
谢谢
<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
from a Redis List. -->
<redis:queue-inbound-channel-adapter
id="fromRedis" channel="in" queue="${name}"
receive-timeout="1000" recovery-interval="3000" expect-message="true"
auto-startup="true"/>
<!-- a queue to avoid lost messages before the header router -->
<int:channel id="in">
<int:queue capacity="1000"/>
</int:channel>
<!-- a bridge to connect channels and have a poller -->
<int:bridge input-channel="in" output-channel="out">
<int:poller fixed-delay="500" />
</int:bridge>
<int:header-value-router id="router" timeout="15000"
input-channel="out" header-name="decision"
resolution-required="false" default-output-channel="defaultChannel" />
---于2002年2月添加
要将消息插入redis,我们有一个Web服务,但实际上就像你说的那样,只需将消息写入redis(
for... channel.send(msg)
而已
关于你的回答我现在正在考虑删除in通道及其队列,并直接使用header-value-router;但我还有更多问题:
-
我认为正确的解决方案是header-value-router中超时的低值,所以如果我们没有可用的消费者,我会更快地获得错误通知 . 如果我不使用值作为超时,它将无限期地阻止,这是一个坏主意,不是吗?
-
我不知道如何管理MesssageDeliveryException,因为路由器没有错误通道配置,???
-
我认为如果我可以管理此错误并收到消息,我可以再次将其重新发送给redis . 还有其他服务器从redis获取消息,幸运的是他们可以参加它 .
我添加了我提出的解决方案,但是不完整,我们不确定错误管理,正如我上面解释的那样
<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
from a Redis List. -->
<redis:queue-inbound-channel-adapter
id="fromRedis" channel="in" queue="${name}"
receive-timeout="1000" recovery-interval="3000" expect-message="true"
auto-startup="true"/>
<!-- a header-value-router with quite low timeout -->
<int:header-value-router id="router" timeout="150"
input-channel="in" header-name="decision"
resolution-required="false" default-output-channel="defaultChannel" />
<!-- ¿if MessageDeliveryException???? what to do??? -->
<int:channel id="someConsumerHeaderValue">
<int:dispatcher task-executor="ConsumerExecutor" />
</int:channel>
<!-- If 5 threads are busy we queue messages up to 5; if queue is full we can increase to 5 more working threads; if no more threads we have a... ¿¿MessageDeliveryException?? -->
<task:executor id="ConsumerExecutor" pool-size="5-5"
queue-capacity="5" />
1 回答
嗯,很高兴看到这样的观察 . 这可能会以某种方式改进框架 .
所以,我想看看:
从Framework透视图重现的一些测试用例 . 虽然我猜有足够的信息可以向Redis发送大量消息并使用你的配置来消费 . (如果有需要,请纠正我)
<int:header-value-router>
之后的下游流量 . 看,你在那里使用timeout="15000"
这是send-timeout
的同义词:从这里我可以说,如果您的下游消费者在某些情况下足够慢,那么您最终会得到:
注意
return false;
正好表示message lost
.这也知道
back-pressure drop
策略 .如果你有不同的照片,请告诉我 .
您可以考虑删除
timeout="15000"
以满足相同的in
队列通道行为 .UPDATE
好吧,错误处理工作方式有点不同 . "guilty"组件只抛出异常,就像使用原始Java一样,并且可以确定此组件不负责异常捕获,这取决于调用者 . 在我们的例子中,调用者是一个上游组件 -
<redis:queue-inbound-channel-adapter>
.任何入站通道适配器都有
error-channel
选项 . 通过<poller>
如果它是MessageSource
或直接它是MessageProducer
.我相信你能够处理:
在
error-channel
子流程中实现您的恢复要求 .