首页 文章

ActiveMQ消息出列但未消耗

提问于
浏览
1

我有一个JBoss Web应用程序,目前正在使用嵌入式HornetQ for JMS . 我们想切换到ActiveMQ HA集群,但我遇到了一些奇怪的问题 . 我的一个队列(periodicDerivationQueue)与HornetQ的行为不同 . AMQ控制台显示消息已入队并出列,但它们并未将其发送给我的消费者 . 起初,我认为消息由于某种原因而出列到DLQ中,但事实似乎并非如此 . 据我了解,除非有必要,否则AMQ不会创建DLQ . 当我看到经纪人时,没有DLQ . 我怎样才能弄明白我的消息在哪里?

由于反射,我也无法从堆栈的应用程序端进行调试 . 我想在AMQ方面设置一个断点来查看我的消息发生了什么,但我不确定在哪里放它 . 这里有什么想法?

这可能是序列化问题吗?我听说有时JMS代理之间序列化的差异会导致奇怪的行为 .

我真的被困在这里,任何帮助将不胜感激 . 请参阅下面的配置信息

Wildfly 8.2

AMQ 5.13

Consumer (Messages not making it here)

public class PeriodicDerivationExecutionHandlerImpl implements PeriodicDerivationExecutionHandler {

protected DerivationService derivationService;
protected DerivationModelService derivationModelService;

protected Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void executeDerivation(PeriodicDerivation params) throws Exception{

    JbpmHibernateUtil.openSession();
    Derivation derivation = null;
    try{            

        if (params.isGroup()){
            derivation = new GroupDerivation();

            GroupQueryParameters qp = new GroupQueryParameters();
            qp.setGroupName(params.getItemName());
            derivation.setDerivedItem(derivationModelService.getGroup(qp));

        }else{
            derivation = new DeterminantDerivation();

            DeterminantQueryParameters qp = new DeterminantQueryParameters();
            qp.setDeterminantName(params.getItemName());            
            derivation.setDerivedItem(derivationModelService.getDeterminant(qp));

        }

        logger.info("Executing periodic derivation [" + derivation + "]");

        derivation.setModelEffectiveDate(new DateTime());
        derivation.setPeriod(params.getPeriod());
        derivation.getProcessParameters().add(new DerivationProcessParameter(PeriodicDerivation.PERIODIC_PROCESS_VAR, true));
        derivation.setExecutionMode(DerivationExecutionMode.SYNCHRONOUS_LOCAL);
        derivationService.executeDerivation(derivation);

        JbpmHibernateUtil.closeSession(true);
    }catch(Exception e){
        logger.error("Periodic derivation execution failed for [" + derivation + "]",e);
        JbpmHibernateUtil.closeSession(false);
        throw new Exception("Periodic derivation execution failed for [" + derivation + "]",e);
    }
}

public DerivationService getDerivationService() {
    return derivationService;
}

public void setDerivationService(DerivationService derivationService) {
    this.derivationService = derivationService;
}

public DerivationModelService getDerivationModelService() {
    return derivationModelService;
}

public void setDerivationModelService(DerivationModelService derivationModelService) {
    this.derivationModelService = derivationModelService;
}

}

Consumer XML config

<int:gateway id="periodicDerivationExecutionGateway"
        service-interface="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandler">
        <int:method name="executeDerivation" request-channel="periodicDerivationChannel" />
    </int:gateway>

    <bean id="periodicDerivationExecutor"
        class="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandlerImpl">
        <property name="derivationService" ref="derivationService" />
        <property name="derivationModelService" ref="derivationModelService" />
    </bean>

    <int:service-activator input-channel="periodicDerivationChannel"
        ref="periodicDerivationExecutor" method="executeDerivation" />

    <int-jms:channel id="periodicDerivationChannel"
        queue-name="${jms.destination.name.periodicderivation}" concurrency="${integration.listener.threads.maximum}"
        task-executor="periodicDerivationTaskExecutor" />

ActiveMQ Standalone.xml (Jboss)

<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
        <resource-adapters>
            <resource-adapter id="activemq-rar.rar">
                <archive>
                    activemq-rar.rar
                </archive>
                <transaction-support>XATransaction</transaction-support>
                <config-property name="ServerUrl">
                    tcp://127.0.0.1:61616?jms.rmIdFromConnectionId=true
                </config-property>
                <config-property name="UserName">
                    admin
                </config-property>
                <config-property name="Password">
                    admin
                </config-property>
                <connection-definitions>

                    <connection-definition 
                       class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" 
                       jndi-name="java:/ConnectionFactory"  
           enabled="true" 
           pool-name="ConnectionFactory"> 
                        <xa-pool>
                            <min-pool-size>1</min-pool-size>
                            <max-pool-size>20</max-pool-size>
                            <prefill>false</prefill>
                            <is-same-rm-override>false</is-same-rm-override>
                        </xa-pool>
                        <recovery>
                            <recover-credential>
                                <user-name>admin</user-name>
                                <password>admin</password>
                            </recover-credential>
                            <recover-plugin class-name="org.jboss.jca.core.recovery.ConfigurableRecoveryPlugin">
                                <config-property name="EnableIsValid">
                                    false
                                </config-property>
                                <config-property name="IsValidOverride">
                                    true
                                </config-property>
                                <config-property name="EnableClose">
                                    true
                                </config-property>
                            </recover-plugin>
                        </recovery>
                    </connection-definition>

                </connection-definitions>

Queues/Topics

<admin-objects>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
                                jndi-name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue" 
                                use-java-context="true" 
                                pool-name="deferredBpmCommandQueue">
                        <config-property name="PhysicalName">
                            deferredBpmCommandQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
                                jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue" 
                                use-java-context="true" 
                                pool-name="ActiveMQQueue.asyncActionRequestQueue">
                        <config-property name="PhysicalName">
                            asyncActionRequestQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
                                jndi-name="java:jboss/exported/jms/queue/bpm/DLQ" 
                                use-java-context="true" 
                                pool-name="DLQ">
                        <config-property name="PhysicalName">
                            DLQ
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue" use-java-context="true" pool-name="ActiveMQQueue.cacheUpdateReplicationQueue">
                        <config-property name="PhysicalName">
                            cacheUpdateReplicationQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue" use-java-context="true" pool-name="ActiveMQQueue.periodicDerivationQueue">
                        <config-property name="PhysicalName">
                            periodicDerivationQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncServiceSignalQueue">
                        <config-property name="PhysicalName">
                            asyncServiceSignalQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/processEventTopic" use-java-context="true" pool-name="ActiveMQTopic.processEventTopic">
                        <config-property name="PhysicalName">
                            processEventTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionReplyQueue">
                        <config-property name="PhysicalName">
                            asyncActionReplyQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/ExpiryQueue" use-java-context="true" pool-name="ActiveMQQueue.ExpiryQueue">
                        <config-property name="PhysicalName">
                            ExpiryQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusRequestTopic">
                        <config-property name="PhysicalName">
                            asyncActionServiceStatusRequestTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityRequestTopic">
                        <config-property name="PhysicalName">
                            asyncActionAffinityRequestTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue" use-java-context="true" pool-name="ActiveMQQueue.jbpmJobQueue">
                        <config-property name="PhysicalName">
                            jbpmJobQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityReplyTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityReplyTopic">
                        <config-property name="PhysicalName">
                            asyncActionAffinityReplyTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationEventTopic" use-java-context="true" pool-name="ActiveMQTopic.cacheUpdateReplicationEventTopic">
                        <config-property name="PhysicalName">
                            cacheUpdateReplicationEventTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusTopic">
                        <config-property name="PhysicalName">
                            asyncActionServiceStatusTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionServiceLogRecordQueue">
                        <config-property name="PhysicalName">
                            asyncActionServiceLogRecordQueue
                        </config-property>
                    </admin-object>
                </admin-objects>

Broker config

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>


<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
      lazy-init="false" scope="singleton"
      init-method="start" destroy-method="stop">
</bean>

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" dataDirectory="${activemq.data}" persistent="true"> 

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" >

              <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>


    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext>


    <persistenceAdapter>
      <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter>

      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="70" />
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>


    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>

    <shutdownHooks>
        <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
    </shutdownHooks>

</broker>
<import resource="jetty.xml"/>

HornetQ Standalone.xml (Jboss)

<subsystem xmlns="urn:jboss:domain:messaging:2.0">
        <hornetq-server>
            <persistence-enabled>false</persistence-enabled>
            <jmx-management-enabled>true</jmx-management-enabled>
            <shared-store>true</shared-store>
            <journal-type>ASYNCIO</journal-type>
            <journal-file-size>102400</journal-file-size>
            <journal-min-files>2</journal-min-files>

            <connectors>
                <netty-connector name="netty" socket-binding="messaging"/>
                <netty-connector name="netty-throughput" socket-binding="messaging-throughput">
                    <param key="batch-delay" value="50"/>
                </netty-connector>
                <in-vm-connector name="in-vm" server-id="0"/>
            </connectors>

            <acceptors>
                <netty-acceptor name="netty" socket-binding="messaging"/>
                <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
                    <param key="batch-delay" value="50"/>
                    <param key="direct-deliver" value="false"/>
                </netty-acceptor>
                <in-vm-acceptor name="in-vm" server-id="0"/>
            </acceptors>

            <security-settings>
                <security-setting match="#">
                    <permission type="send" roles="guest"/>
                    <permission type="consume" roles="guest"/>
                    <permission type="createNonDurableQueue" roles="guest"/>
                    <permission type="deleteNonDurableQueue" roles="guest"/>
                </security-setting>
            </security-settings>

            <address-settings>
                <address-setting match="#">
                    <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                    <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                    <redelivery-delay>0</redelivery-delay>
                    <max-size-bytes>104857600</max-size-bytes>
                    <page-size-bytes>10485760</page-size-bytes>
                    <page-max-cache-size>10</page-max-cache-size>
                    <address-full-policy>PAGE</address-full-policy>
                    <message-counter-history-day-limit>10</message-counter-history-day-limit>
                </address-setting>
            </address-settings>

            <jms-connection-factories>
                <connection-factory name="InVmConnectionFactory">
                    <connectors>
                        <connector-ref connector-name="in-vm"/>
                    </connectors>
                    <entries>
                        <entry name="java:/ConnectionFactory"/>
                    </entries>
                </connection-factory>
                <connection-factory name="RemoteConnectionFactory">
                    <connectors>
                        <connector-ref connector-name="netty"/>
                    </connectors>
                    <entries>
                        <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
                    </entries>
                    <client-failure-check-period>30000</client-failure-check-period>
                    <connection-ttl>300000</connection-ttl>
                    <retry-interval>2000</retry-interval>
                    <retry-interval-multiplier>1</retry-interval-multiplier>
                    <max-retry-interval>2000</max-retry-interval>
                    <reconnect-attempts>100</reconnect-attempts>
                </connection-factory>
            </jms-connection-factories>

Queues/Topics

<jms-destinations>
                <jms-queue name="asyncActionRequestQueue">
                    <entry name="queue/bpm/asyncActionRequestQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"/>
                </jms-queue>
                <jms-queue name="asyncActionReplyQueue">
                    <entry name="queue/bpm/asyncActionReplyQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue"/>
                </jms-queue>
                <jms-queue name="asyncServiceSignalQueue">
                    <entry name="queue/bpm/asyncServiceSignalQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue"/>
                </jms-queue>
                <jms-queue name="asyncActionServiceLogRecordQueue">
                    <entry name="queue/bpm/asyncActionServiceLogRecordQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue"/>
                </jms-queue>
                <jms-queue name="deferredBpmCommandQueue">
                    <entry name="queue/bpm/deferredBpmCommandQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"/>
                </jms-queue>
                <jms-queue name="jbpmJobQueue">
                    <entry name="queue/bpm/jbpmJobQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue"/>
                </jms-queue>
                <jms-queue name="DLQ">
                    <entry name="queue/DLQ"/>
                    <entry name="java:jboss/exported/jms/queue/DLQ"/>
                </jms-queue>
                <jms-queue name="ExpiryQueue">
                    <entry name="queue/ExpiryQueue"/>
                    <entry name="java:jboss/exported/jms/queue/ExpiryQueue"/>
                </jms-queue>
                <jms-queue name="periodicDerivationQueue">
                    <entry name="queue/bpm/periodicDerivationQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue"/>
                </jms-queue>
                <jms-queue name="cacheUpdateReplicationQueue">
                    <entry name="queue/bpm/cacheUpdateReplicationQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue"/>
                </jms-queue>
                <jms-topic name="asyncActionServiceStatusTopic">
                    <entry name="topic/bpm/asyncActionServiceStatusTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusTopic"/>
                </jms-topic>
                <jms-topic name="asyncActionServiceStatusRequestTopic">
                    <entry name="topic/bpm/asyncActionServiceStatusRequestTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusRequestTopic"/>
                </jms-topic>
                <jms-topic name="asyncActionAffinityRequestTopic">
                    <entry name="topic/bpm/asyncActionAffinityRequestTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityRequestTopic"/>
                </jms-topic>
                <jms-topic name="asyncActionAffinityReplyTopic">
                    <entry name="topic/bpm/asyncActionAffinityReplyTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityReplyTopic"/>
                </jms-topic>
                <jms-topic name="processEventTopic">
                    <entry name="topic/bpm/processEventTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/processEventTopic"/>
                </jms-topic>
                <jms-topic name="cacheUpdateReplicationEventTopic">
                    <entry name="topic/bpm/cacheUpdateReplicationEventTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/cacheUpdateReplicationEventTopic"/>
                </jms-topic>
            </jms-destinations>

1 回答

  • 2

    ObjectMessage序列化安全性是个问题 .

    ObjectMessage对象依赖于marshal / unmarshal对象有效负载的Java序列化 . 由于恶意有效负载可以利用主机系统,因此通常认为此过程不安全 . 这就是从版本5.12.2和5.13.0开始的原因,ActiveMQ强制用户明确将可以使用ObjectMessages交换的包列入白名单 .

    我几天前看到这个,并添加了一个白名单,但它没有解决问题 . 我也试过针对AMQ 5.11.3运行它并没有用 . 显然他们也将安全功能添加到5.11.3 . 无论如何,我将此(-Dorg.apache.activemq.SERIALIZABLE_PACKAGES =“*”)添加到客户端和AMQ vm参数,现在一切正常 .

    请记住,我使用的命令行选项是我在代理中明确打开的安全漏洞,可以允许恶意用户在我的系统上执行代码 . 使用该标志的正确方法是显式列出允许反序列化的类,或者最多使用包通配符以避免在可信父包中显式列出单个类和子包 .

相关问题