首页 文章

尽管producerFlowControl为false,但activemq缓慢消费者阻止 生产环境 者

提问于
浏览
4

我在我的系统中使用activemq,我看到的是以下消息:TopicSubscription:consumer = ...:Pending message cursor [org.apache.activemq.broker.region.cursors.VMPendingMessageCursor@1684f89c]已满,临时使用(达到0%)或内存使用率(100%)限制,阻止消息add()等待资源释放 .

这是因为如果我理解正确,我的消费者很慢,而我的制作人很快 . 结果是最终我的 生产环境 者被阻止,直到消费者读取消息并释放一些内存 . 我想知道的是我的制作人没有被封锁,而且当内存已满时,旧的消息也会被发现 .

鉴于我对我所读到的内容的理解,下面的配置应该做的伎俩(messageEvictionStrategy,pendingMessageLimitStrategy),但它不适合我,我无法弄清楚为什么 .

我已经指定了低内存限制低(35Mb)以使问题更快出于测试原因,但情况是我最终需要它,当问题出现activemq只是丢弃旧消息 .

我找到了一个非常令人满意的解决方案,即在ActiveMQConnectionFactory中设置useAsyncSend = true并指定sendTimeout . 这使得 生产环境 者没有被阻止,但是这样最新的消息被丢弃而不是olderst消息 .

最后,我正在谈论非持久性主题 .

任何帮助的人都会完美 . 下面我有activemq配置

<destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry topic=">" producerFlowControl="false" memoryLimit="35 Mb">
               <pendingSubscriberPolicy>
                   <vmCursor />
                </pendingSubscriberPolicy>
                    <messageEvictionStrategy>
                        <oldestMessageEvictionStrategy/>
                    </messageEvictionStrategy>
                    <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="10"/>
                    </pendingMessageLimitStrategy>
                </policyEntry>
            </policyEntries>
        </policyMap>
    </destinationPolicy>

    <systemUsage>
        <systemUsage sendFailIfNoSpace="true">
            <memoryUsage>
                <memoryUsage limit="35 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="1 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="5000 mb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>

activemq版本5.7.0

我使用spring模板发送消息:

<bean class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="pooledJmsConnectionFactory"/>
    <property name="timeToLive" value="100"/>
</bean>

我传输的javax.jms.ObjectMessage相对较小 .

我在客户前提中发现了问题我在我的应用程序中有很多toppics,但设法重现它从1个线程,不间断的消息始终发送到同一个主题 . 消息发送只是一个小字符串 .

我只有一个 生产环境 者,当我有一个(或更多)慢的消费者时似乎出现问题 - 但是一个慢的消费者就足够了 . 如果不存在缓慢的消费者,则不会出现问题 .

我不认为它有任何区别,但我使用

<transportConnectors>
       <transportConnector name="openwire" uri="nio://0.0.0.0:33029?wireFormat.maxInactivityDuration=60000&amp;wireFormat.maxInactivityDurationInitalDelay=60000"/>
    </transportConnectors>

2 回答

  • 2

    我该如何重新创建呢?这个主题有多少 生产环境 者/消费者?这只是一个话题吗?

    您的设置看起来没问题,但您不需要在策略上设置memoryLimit = 35mb . 将它设置为与整个系统使用相同是没有意义的 . 这个想法是所有主题的内存限制组合将等于系统内存限制 . 例如,如果您有两个主题,每个主题将使用35MB(2 * 35 == 70MB),这将超过整个系统内存设置 . 我不认为这是你所看到的具体原因,但要记住一些事情 .

    这个版本的ActiveMQ是什么?如果您已经编写了一些可以产生此功能的测试,请告诉我们 .

  • 1

    事实证明,当使用JmsTemplate以便发送异步然后发送无法传递的消息时,我们需要启用explicitQosEnabled并设置deliveryMode = 1(非持久性) . 同样在客户端,消费者需要具有较小的预取限制

    服务器

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledJmsConnectionFactory"/>
        <property name="explicitQosEnabled" value="true"/>
        <property name="deliveryMode" value="1"/>
    </bean>
    

    客户

    <jms:listener-container .. prefetch="1000">
    ...
    </jms:listener-container>
    

    不要问我为什么......但这似乎解决了我的问题 . 基本上非100%需要,但如果有人可以向我解释这将是完美的

相关问题