首页 文章

消费者没有从ActiveMQ接收消息

提问于
浏览
7

我们面临着ActiveMQ及其消费者的随机问题 . 我们观察到,即使它们连接到ActiveMQ队列,也很少有消费者没有收到消息 . 但在消费者重新启动后它可以正常工作 .

我们在ActiveMQ端有一个名为testQueue的队列 . 使用者正在尝试从此队列中取消队列消息 . 我们正在使用Spring的DefaultMessageListenerContainer来实现此目的 . 消息正从ActiveMQ Broker传递到使用者节点 . 从tcpdump来看,很明显,消息正在到达消费者节点,但实际的消费者代码无法看到消息 . 换句话说,消息似乎停留在ActiveMQ使用者代码或Spring的DefaultMessageListenerContainer中 .

请参阅下图 . 为了更清楚地解决这个问题 . 消息正在到达消费者节点,但它没有到达“实际消费者类”,这意味着消息卡在AMQ消费者代码或Spring DMLC中 .

enter image description here

以下是从ActiveMQ管理员处捕获的详细信息 .

队列名称/待定消息计数/消费者计数/消息排队/消息排队testQueue / 9/1/9/0

以下是更多细节 .

Connection-ID / SessionId / Selector / Enqueues / Dequeues / Dispatched / Dispatched-Queue / Prefetch ID:bearsvir52-45176-1375519181268-3:5/1 / / 9/0/9/9/250

从第二个表中可以明显看出,消息正在传递给消费者,但消费者并未确认消息 . 因此,消息被卡在代理端的Dispatched-Queue中 .

您的通知几点:

1)b / w Broker节点和消费者节点没有时间差 .

2)在消费者端观察到tcpdump . 我们可以看到MessageDispatch(Openwire)数据包被转移到消费者节点,但是找不到MessageAck(Openwire) .

3)有时它在节点上工作,有时它在同一节点上创建问题 .

2 回答

  • 1

    找出解决方案花了很多时间 . 在AMQ故障转移的情况下, org.apache.activemq.ActiveMQConnection.java 类似乎存在一些问题 . 在这种情况下,连接对象不会在消费者端启动 .

    以下是我在ActiveMQConnection.java文件中添加的修复程序,并编译了源代码以创建 activemq-core-x.x.x.jar

    private final Object startMutex = new Object();
    

    在createSession方法中添加了一个检查

    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        synchronized (startMutex) {
            if(!isStarted()) {
                start();
            }
        }
    
  • 2

    造成这种情况的一个原因可能是错误地使用 CachingConnectionFactory (带缓存的消费者)和一个动态调整消费者(最大消费者>消费者)的监听器容器 . 您最终可能只是坐在池中而没有被主动使用的缓存消费者 . 您永远不需要使用侦听器容器来缓存使用者 .

    对于这样的问题,我通常建议使用TRACE日志记录运行,您可以看到所有的消费者活动 .

相关问题