首页 文章

具有高吞吐量的Spring集成的Redis队列是丢失消息

提问于
浏览
2

我使用redis作为队列(使用spring queue-in / outbound-channel-adapter)来分配任务(消息进入队列等)

由于吞吐量非常高,我们观察到,尽管消息被发送到redis队列,但很多消息都丢失了,并且在入站(头路由器)之后没有消息到达组件

通道配置附在下面;关键是我们虽然问题是在入站addapter之后的这个头路由器中,但是无法管理从队列读取的消息的速率,所以它们丢失了 .

我们在入站适配器和此组件(即标头路由器)之间使用了一个中间元素,并添加了一个队列来解决此问题 .

这很好,但实际上我们并不完全理解解决方案,如果这是正确的解决方案 .

关于这种配置的专家观点和意见将是好的!

谢谢


<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
     from a Redis List. -->
    <redis:queue-inbound-channel-adapter
     id="fromRedis" channel="in" queue="${name}"
        receive-timeout="1000" recovery-interval="3000" expect-message="true"
            auto-startup="true"/>

    <!-- a queue to avoid lost messages before the header router -->
    <int:channel id="in">
        <int:queue capacity="1000"/>
    </int:channel>

    <!-- a bridge to connect channels and have a poller -->
    <int:bridge input-channel="in" output-channel="out">
        <int:poller fixed-delay="500" />
    </int:bridge>

    <int:header-value-router id="router" timeout="15000"
        input-channel="out" header-name="decision"
        resolution-required="false" default-output-channel="defaultChannel" />

---于2002年2月添加

要将消息插入redis,我们有一个Web服务,但实际上就像你说的那样,只需将消息写入redis(

for... channel.send(msg)

而已

关于你的回答我现在正在考虑删除in通道及其队列,并直接使用header-value-router;但我还有更多问题:

  • 我认为正确的解决方案是header-value-router中超时的低值,所以如果我们没有可用的消费者,我会更快地获得错误通知 . 如果我不使用值作为超时,它将无限期地阻止,这是一个坏主意,不是吗?

  • 我不知道如何管理MesssageDeliveryException,因为路由器没有错误通道配置,???

  • 我认为如果我可以管理此错误并收到消息,我可以再次将其重新发送给redis . 还有其他服务器从redis获取消息,幸运的是他们可以参加它 .

我添加了我提出的解决方案,但是不完整,我们不确定错误管理,正如我上面解释的那样

<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
 from a Redis List. -->
<redis:queue-inbound-channel-adapter
 id="fromRedis" channel="in" queue="${name}"
    receive-timeout="1000" recovery-interval="3000" expect-message="true"
        auto-startup="true"/>

 <!-- a header-value-router with quite low timeout -->
   <int:header-value-router id="router" timeout="150"
    input-channel="in" header-name="decision"
    resolution-required="false" default-output-channel="defaultChannel" />

 <!-- ¿if MessageDeliveryException???? what to do??? -->

<int:channel id="someConsumerHeaderValue">
    <int:dispatcher task-executor="ConsumerExecutor" />
</int:channel>
<!-- If 5 threads are busy we queue messages up to 5; if queue is full we can increase to 5 more working threads; if no more threads we have a... ¿¿MessageDeliveryException?? -->

<task:executor id="ConsumerExecutor" pool-size="5-5"
               queue-capacity="5" />

1 回答

  • 1

    嗯,很高兴看到这样的观察 . 这可能会以某种方式改进框架 .

    所以,我想看看:

    • 从Framework透视图重现的一些测试用例 . 虽然我猜有足够的信息可以向Redis发送大量消息并使用你的配置来消费 . (如果有需要,请纠正我)

    • <int:header-value-router> 之后的下游流量 . 看,你在那里使用 timeout="15000" 这是 send-timeout 的同义词:

    指定在可能阻塞时将消息发送到目标MessageChannel时等待的最长时间(以毫秒为单位)(例如,当前已满的有界队列通道) . 默认情况下,发送将无限期阻止 . 'timeout'的同义词 - 只能提供一个 .

    从这里我可以说,如果您的下游消费者在某些情况下足够慢,那么您最终会得到:

    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary up to the specified wait time for space to become available.
     *
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
    ....
    while (count.get() == capacity) {
           if (nanos <= 0)
                 return false;
           nanos = notFull.awaitNanos(nanos);
    }
    

    注意 return false; 正好表示 message lost .

    这也知道 back-pressure drop 策略 .

    如果你有不同的照片,请告诉我 .

    您可以考虑删除 timeout="15000" 以满足相同的 in 队列通道行为 .

    UPDATE

    好吧,错误处理工作方式有点不同 . "guilty"组件只抛出异常,就像使用原始Java一样,并且可以确定此组件不负责异常捕获,这取决于调用者 . 在我们的例子中,调用者是一个上游组件 - <redis:queue-inbound-channel-adapter> .

    任何入站通道适配器都有 error-channel 选项 . 通过 <poller> 如果它是 MessageSource 或直接它是 MessageProducer .

    我相信你能够处理:

    if (!sent) {
        throw new MessageDeliveryException(message,
                "failed to send message to channel '" + channel + "' within timeout: " + timeout);
    }
    

    error-channel 子流程中实现您的恢复要求 .

相关问题