ActiveMQ / Spring专家,

我遇到了ActiveMQ和 DefaultMessageListenerContainer / SimpleMessageListenerContainer 组合的一个非常奇怪的问题 . 我们有一个使用Spring构建的Web应用程序(我们在4.x) . 其中一个事务与处理文件的批量上载有关,每个文件都有多行 . 每一行都将成为处理消息 .

发布者将每一行作为消息发布到持久性队列 . 通过ActiveMQ控制台检查时,我们可以看到队列中的消息 . 对于同一队列,我们使用DefaultMessageListenerContainer(DMLC)/ SimpleMessageListenerContainer(SMLC)(两者都试过)配置一组侦听器来使用消息 .

当发布者发布100条消息时,有时只传递99或98条消息,但其余消息都被排入队列 . 配置如下:

  • ActiveMQ代理以独立模式运行,未联网或嵌入WildFly中 .

  • 在Spring应用程序中,我们尝试了DMLC和SMLC,但都遇到了这个问题 . 尝试 simpleMQConnectionFactory 以及 PooledConnectionFactory ,两次遇到同样的问题 .

  • 尝试在 PooledConnectionFactory 上将预取限制设置为"1"并遇到同样的问题 . 当设置为"0"时,Spring SMLC会抛出异常 .

  • 最大并发使用者数设置为50

  • 当邮件被卡住时,如果我们重新启动WildFly,队列中的剩余邮件将被传递给使用者 .

  • 我们没有使用交易会话而是设置 acknowledgModeName = "CLIENT_ACKNOWLEDGE"

  • 我们使用spring bean初始化队列,并使用它来初始化SMLC或DMLC

我没有选择尝试这个 . 如果您分享您在这方面的经验,我们非常感谢 . 此应用程序正在 生产环境 中,问题几乎每隔一天发生一次,有时会在一天内多次发生 .

private void publishDMRMessage(DmrDTO p_dmrDTO, long jobID, int numDMRs) {
            //Create a DMR message for each of the unique keys and publish it to 
            try {

                DMRImportMessage message = new DMRImportMessage();
                message.setDmrDTO(p_dmrDTO);
                message.setDmrKey(p_dmrDTO.toString());
                message.setDmrImportJobID(new Long(jobID));
                message.setTask(Command.INITIALIZE_DMR_FORM);
                message.setNumDMRForms(new Long(numDMRs));
                sender.sendMessage(message);
            } catch (Exception e) {
                System.out.println(" JMS Exception = " + e.getMessage());
                e.printStackTrace();
            }
     }

public class DMRMessageListener implements MessageListener {

    private DMRImportManager manager;

    private JMSMessageSender    sender;

    private DmrFormInitService  formService;

    private ProcessDMRValidationMessage validateService;

    private ImportDmrService    dmrService;

    private static final Logger log = Logger.getLogger(DMRMessageListener.class);

    public ImportDmrService getDmrService() {
        return dmrService;
    }

    public void setDmrService(ImportDmrService dmrService) {
        this.dmrService = dmrService;
    }

    public ProcessDMRValidationMessage getValidateService() {
        return validateService;
    }

    public void setValidateService(ProcessDMRValidationMessage validateService) {
        this.validateService = validateService;
    }

    public DmrFormInitService getFormService() {
        return formService;
    }

    public void setFormService(DmrFormInitService formService) {
        this.formService = formService;
    }

    public JMSMessageSender getSender() {
        return sender;
    }

    public void setSender(JMSMessageSender sender) {
        this.sender = sender;
    }

    public DMRImportManager getManager() {
        return manager;
    }

    public void setManager(DMRImportManager manager) {
        this.manager = manager;
    }


    public void onMessage(Message message) {
        if (message instanceof ObjectMessage) {
            try {
                ObjectMessage objectMessage = (ObjectMessage) message;
                DMRImportMessage dmrMessage =  (DMRImportMessage)objectMessage.getObject();

                log.info("============= Message Received =========================");
                log.info("Message Type = " + dmrMessage.getTask() + " for JOB ID = " + dmrMessage.getDmrImportJobID());
                log.info("Message Received === DMR ID = " + dmrMessage.getDmrID());
                log.info("Message Received === DMR key = " + dmrMessage.getDmrKey());
                log.info("============= Message Received =========================");

                //Process the message
                processDMRMessage(dmrMessage);

                DMRProcessingStatus status = manager.updateStatus(dmrMessage);

                if (status.isStatus()) {
                    log.info(" One stage is complete, the next stage should start for JOB ID = " + dmrMessage.getDmrImportJobID());
                    publishMessageForNextStepOfProcessing(dmrMessage, status);
                }

            }
            catch (Exception ex) {
                ex.printStackTrace();
                throw new RuntimeException(ex);
            }
        }
        else {
            log.error(" *****  Received an invalid message -- NOT AN Object message so cannot be processed and will result in stuck jobs **** ");
            throw new IllegalArgumentException("Message must be of type ObjectMessage");
        }

        //Send the next message in the chain
    }

    /**
     * It will examine the message content and based on the message type it will invoke the appropriate
     * service.
     * 
     * @param dmrMessage DMRImportMessage 
     */
    private void processDMRMessage(DMRImportMessage dmrMessage) {

            if (dmrMessage.getTask() == Command.INITIALIZE_DMR_FORM) {

                Map<String, String> dmrInitResults = formService.initDmrForm(dmrMessage.getDmrDTO());
                //Indicate in message that this DMR Key is not in ICIS
                if (dmrInitResults != null) {
                    if (StringUtils.equalsIgnoreCase(dmrInitResults.get("wsUnscheduleDmrError"), "truee")) {
                        log.info("DMR Key is not in ICIS: " + dmrMessage.getDmrDTO().toString());
                        dmrMessage.setDmrKeyInICIS(false);
                    } else if (StringUtils.equalsIgnoreCase(dmrInitResults.get("wsDBDown"), "truee")) {
                        log.error("Web Service call failed for DMR Key: " + dmrMessage.getDmrDTO().toString());
                    }
                }

            }

            try {
                if (dmrMessage.getTask() == Command.IMPORT_DMR_PARAMETER)  {
                    //Process the Parameter line
                    ParameterProcessingStatus status = dmrService.processLine(dmrMessage.getDmrImportJobID(), dmrMessage.getDmrParameterSubmission(), new Integer(dmrMessage.getLineNumber()), dmrMessage.getDmrKeysNotInICIS());
                    System.out.println("LINE = " + dmrMessage.getLineNumber() + "  Status = " + status.isStatus());
                    dmrMessage.setProcessingStatus(status.isStatus());
                    dmrMessage.setDmrID(status.getDmrID());
                    dmrMessage.setDmrComment(status.getDmrComment());                   
                    return;
                }
            } catch(Exception e) {
                log.error("An exception occurred during processing of line " + dmrMessage.getLineNumber() + " in job " + dmrMessage.getDmrImportJobID());
                e.printStackTrace();
                dmrMessage.setProcessingStatus(false);
                dmrMessage.setDmrID(0L);
            }

            try {
                if (dmrMessage.getTask() == Command.END_DMR_PARAMETER_IMPORT) {
                    //Process the Parameter line
                    //ParameterProcessingStatus status = dmrService.processLine(dmrMessage.getDmrImportJobID(), dmrMessage.getDmrParameterSubmission(), 100);
                    dmrMessage.setProcessingStatus(true);
                    dmrMessage.setDmrID(0L);
                    return;
                }
            } catch(Exception e) {
                e.printStackTrace();
                dmrMessage.setProcessingStatus(false);
                dmrMessage.setDmrID(0L);
            }

            try {
                if (dmrMessage.getTask() == Command.POST_PROCESS_DMR) {
                    //Validate DMRs
                    validateService.validateDMR(dmrMessage);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    }

    private void publishMessageForNextStepOfProcessing(DMRImportMessage dmrMessage, DMRProcessingStatus status) throws JMSException {
            log.info(" =========== Publish a message for next step of processing for JOB ID = " + dmrMessage.getDmrImportJobID());

            if (dmrMessage.getTask() == Command.INITIALIZE_DMR_FORM) {
                //Start the DMR Parameter Processing
                sender.sendDMRControlMessage(this.createControlMessage(ProcessPhase.START_PARAMETER_PROCESSING, dmrMessage.getDmrImportJobID(), status.getDmrKeysNotInICIS()));
                return;
            }

            if ((dmrMessage.getTask() == Command.IMPORT_DMR_PARAMETER)
                    || (dmrMessage.getTask() == Command.END_DMR_PARAMETER_IMPORT)) {
                //Start the DMR Validation Process
                dmrService.postProcessParameters(dmrMessage.getDmrImportJobID(), status.getSuccessfulLines(), status.getErroredLines());
                DMRImportControlMessage message = this.createControlMessage(ProcessPhase.START_DMR_VALIDATION, dmrMessage.getDmrImportJobID());
                message.setDmrIDsWithComments(status.getDmrIDsWithComments());
                sender.sendDMRControlMessage(message);
                return;
            }

            if (dmrMessage.getTask() == Command.POST_PROCESS_DMR) {
                //Start the next DMR import process
                sender.sendDMRControlMessage(this.createControlMessage(ProcessPhase.START_DMR_FORM_INIT, dmrMessage.getDmrImportJobID()));
                return;
            }

            log.info(" =========== End Publish a message for next step of processing for JOB ID = " + dmrMessage.getDmrImportJobID());
    }

    private DMRImportControlMessage createControlMessage(DMRImportControlMessage.ProcessPhase phase, Long jobID) {
            return createControlMessage(phase, jobID, null);
    }

    private DMRImportControlMessage createControlMessage(DMRImportControlMessage.ProcessPhase p_phase, Long p_jobID, Set<DmrDTO> p_dmrDTOs) {
        DMRImportControlMessage message = new DMRImportControlMessage();
        message.setDmrImportJobID(p_jobID);
        message.setPhase(p_phase);

        if (p_dmrDTOs != null) {
            message.setDmrKeysNotInICIS(p_dmrDTOs);
        }

        return message;
    }
//Bean Configs.

<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="0"/>
</bean>

<bean id="jmsFactoryPub" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="tcp://localhost:61616" />
</bean>

<bean id="jmsFactoryReceive" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="tcp://localhost:61616" />
<property name="prefetchPolicy" ref="prefetchPolicy" />
</bean>

<bean id="jmsFactoryControlMsg" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
      <value>tcp://localhost:61616</value>
    </property>
</bean>
<bean id="dmrQueue" 
    class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="DMRQueue" />
</bean> 

<bean id="dmrControlQueue" 
    class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="DMRControlQueue" />
</bean>    

<bean id="jmsQueueTemplate"  class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactoryPub" />
</bean>

<bean id="jmsQueueTemplateControlMsg"  class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactoryControlMsg" />
</bean>

<bean id="messageCreator"  class="net.exchangenetwork.netdmr.service.DMRMessageCreator">
</bean>

<bean id="dmrMessageListener"  class="net.exchangenetwork.netdmr.service.DMRMessageListener">
    <property name="manager" ref="dmrImportManager"/>
    <property name="sender" ref="messagePublisher"/>
    <property name="formService" ref="dmrFormInit"/>
    <property name="validateService" ref="dmrValidator"/>
    <property name="dmrService" ref="importDmrService"/>
</bean>


<bean id="messageSender"  class="net.exchangenetwork.netdmr.service.JMSMessageSender">
    <property name="jmsTemplate" ref="jmsQueueTemplate" />
    <property name="sendQueue" ref="dmrQueue" />
    <property name="creator" ref="messageCreator" />
</bean>


<bean id="messagePublisher"  class="net.exchangenetwork.netdmr.service.JMSMessageSender">
    <property name="jmsTemplate" ref="jmsQueueTemplateControlMsg" />
    <property name="sendQueue" ref="dmrControlQueue" />
    <property name="creator" ref="messageCreator" />
</bean>
<bean id="jmsContainer"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactoryReceive"/>
        <!-- this is the queue we will listen on -->
        <property name="destination" ref="dmrQueue" />
        <property name="messageListener" ref="dmrMessageListener"/>
        <property name="concurrentConsumers" value="60"/>
        <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
        <property name="errorHandler" ref="jmsErrorHandler"/>
        <property name="exceptionListener" ref="jmsExceptionHandler"/>
        <property name="receiveTimeout" value="0"/>
    </bean>