首页 文章

Mule Quartz调度程序:处理多个消息

提问于
浏览
2

这个问题是在4个月前提出来的 .

https://stackoverflow.com/posts/16241300/edit

任何人?

“我已经在mule流中编写了一个石英代码,每隔5分钟就会消耗队列中的所有消息 .

<quartz:inbound-endpoint jobName="abc" cronExpression="0 0/1 * * * ?" doc:name="Quartz">
               <quartz:endpoint-polling-job>
                     <quartz:job-endpoint ref="jmsEndPoint" />
               </quartz:endpoint-polling-job>
        </quartz:inbound-endpoint>

但是,即使队列中有5条消息,上面的代码一次只消耗一条消息 .

我的要求是每5分钟运行一次作业并使用队列中的所有消息 .

另一个要求是使用消息有效负载内的唯一标识符过滤掉重复的消息 .

任何帮助将不胜感激 . “

编辑:JMS endpoints

<jms:endpoint name="jmsEndPoint" queue="MyQueue" connector-ref="connector"/>

3 回答

  • 0

    队列是基于事件的,旨在返回一条消息(先进先出) . 为了在Mule流中使用来自队列的所有消息,一种方法是创建一个自定义组件,该组件将以编程方式使用队列中的jms消息,直到没有更多消息为止 .

    为了过滤重复的消息,请考虑使用Mule的幂等路由器:

    http://www.mulesoft.org/documentation/display/current/Routing+Message+Processors#RoutingMessageProcessors-IdempotentMessageFilter

    HTH

  • 1

    看看你的代码看起来你需要像这样阅读它:

    muleEventContext.requestEvent("MyQueue", -1);
    

    如果要对id进行过滤,可以执行以下操作:

    <idempotent-message-filter idExpression="#[message:id]-#[header:foo]">
        <simple-text-file-store directory="./idempotent"/>
     </idempotent-message-filter>
    
  • 2

    在你的Mule-config xml中:

    <quartz:connector name="quartzConnector">
        <receiver-threading-profile
            maxThreadsActive="1" />
      </quartz:connector>
    
      <flow name="DelayedMessageProcessing">
        <quartz:inbound-endpoint name="qEP6"
                                 cronExpression="${some.cron.expression}"
                                 jobName="DelayedProcessing"
                                 connector-ref="quartzConnector">
          <jms:transaction action="ALWAYS_BEGIN" />
          <quartz:event-generator-job />
        </quartz:inbound-endpoint
    
        <component class="com.something.myComponent" />
      </flow>
    

    ..和Java组件:

    package com.something;
    
    import org.mule.api.MuleEventContext;
    import org.mule.api.MuleException;
    import org.mule.api.MuleMessage;
    import org.mule.api.lifecycle.Callable;
    
    public class MyComponent implements Callable {
    
        public Object onCall(final MuleEventContext muleEventContext) throws Exception {
            MuleMessage delayedMessage = fetchMessage(muleEventContext);
    
            while (delayedMessage != null) {
                //You might have to copy properties from inbound to outbound scope here..
                muleEventContext.dispatchEvent(delayedMessage, "some.jms.endpoint");
                delayedMessage = fetchMessage(muleEventContext);
            }
    
            return null;
        }
    
        private MuleMessage fetchMessage(final MuleEventContext muleEventContext) throws MuleException {
            return muleEventContext.requestEvent("some.delayed.jms.endpoint", 3000);
        }
    }
    

相关问题