首页 文章

Apache Camel Splitter,Threadpool和JMS

提问于
浏览
1

我在spring xml中定义了以下路由,以在文本文件中拆分行,并将每行发送到JMS队列

<bean id="myPool" class="java.util.concurrent.Executors" factory-method="newCachedThreadPool"/>

<camelContext id="concurrent-route-context" xmlns="http://camel.apache.org/schema/spring" trace="true">
    <route id="inbox-threadpool-split-route">
        <from uri="{{inbox.uri}}" />
            <log message="Starting to process file: ${header.CamelFileName}" />
            <split streaming="true" executorServiceRef="myPool">
                <tokenize token="\n" />
                <to uri="{{inventory.queue.uri}}" />    
            </split>
            <log message="Done processing file: ${header.CamelFileName}" />
    </route>
</camelContext>

inbox.uri是一个文件组件uri,用于监听目录中的文件,而inventory.queue.uri是一个JmsComponent uri,它连接到JMS服务器中的队列(Tibco EMS 6.X版本) . JmsComponent uri很简单,比如“JmsComponent:queue:?username =&password =”

上述路由可以无错误地运行,但是从文件中分离的行不会作为JMS消息发送到队列(即程序运行后队列仍为空)

如果我从拆分器定义中删除executorServiceRef =“myPool”(其余定义如下),则拆分的消息可以逐个传递到JMS队列 .

如果我用“直接” endpoints 替换“to”uri,那么无论是否在分割器中使用线程池,都可以传递分裂的消息

JmsComponent中是否需要任何特殊设置才能使其与Splitter线程池一起使用?或者我错过的任何其他配置?

=======编辑20150731 =======

使用1000行的Big CSV文件进行测试时,我遇到了上述问题 . 如果我使用一个小文件(例如,仅10行)进行测试,我可以看到消息被传递到inventory.queue,但是从日志中看起来需要10秒才能完成拆分并将消息传递到队列中 . 下面抓了日志:

2015-07-31 11:02:07,210 [main           ] INFO  SpringCamelContext             - Apache Camel 2.15.0 (CamelContext: concurrent-route-context) started in 1.301 seconds
2015-07-31 11:02:07,220 [main           ] INFO  MainSupport                    - Apache Camel 2.15.0 starting
2015-07-31 11:02:17,250 [://target/inbox] INFO  inbox-threadpool-split-route   - Done processing file: smallfile.csv

看到路线从11:02:07开始,并在11:02:17显示“完成处理...”语句,即10秒

如果我再次使用5行CSV进行测试,则需要5秒......看起来每行需要1秒才能分割并传送到JMS队列...这很慢

如果我将“to uri”更改为“direct”而不是“JMS”,则可以在一秒钟内快速完成拆分

此外,从JMS侦听器日志中,它能够在同一秒内接收所有10条消息 . 似乎Splitter将读取并拆分整个文件,为所有十行“准备”10条JMS消息,然后将所有消息传递到队列,但不是“拆分1行并立即传送1条JMS消息” . .

是否有任何选项或配置可以更改拆分器行为并提高拆分性能?

2 回答

  • 0

    使用具有标记化的分割器处理14G文件时,我遇到了类似的问题 . 我能够通过使用Aggregator来克服性能驼峰,正如Claus的帖子所指出的Parsing Large Files with Apache Camel

    在聚合批处理消息之后,我使用 生产环境 者模板将这些消息路由到消息传递系统 . 希望有所帮助 .

  • 1

    感谢@Aayush Tuladhar分享的参考链接,我更新了我的路线如下:

    <camelContext id="concurrent-route-context" xmlns="http://camel.apache.org/schema/spring" trace="false" >
        <route id="inbox-threadpool-split-route">
            <from uri="{{inbox.uri}}" />
                <log message="Starting to process file: ${header.CamelFileName}" />
                <split streaming="true" executorServiceRef="myPool">
                    <tokenize token="\n" />
                    <log message="split index - $simple{property.CamelSplitIndex}, row content=$simple{body}" />
                    <aggregate strategyRef="stringBodyAggregator"  completionInterval="750"  >
                        <correlationExpression>
                            <simple>property.CamelSplitIndex</simple>
                        </correlationExpression>
                        <to uri="{{inventory.queue.uri}}" />
                    </aggregate>
                </split>
                <log message="Done processing file: ${header.CamelFileName}" />
        </route>
    </camelContext>
    

    这里的诀窍是在分离器中添加了一个聚合器,它使用了

    property.CamelSplitIndex
    

    作为correlationExpression . CamelSplitIndex为每个拆分行保持递增,因此聚合器实际上并没有“聚合”任何内容,而是结束“聚合”并立即将JMS消息排入JMS队列 . aggregationStrategy只是加入了oldExchange和newExchange,但这里并不重要,因为它只是用于为聚合EIP实现必需的属性“strategyRef”

    需要注意的一点是,在使用此技巧后,性能瓶颈转移到JMS消息生成器,每秒传递1条消息...我通过利用CachingConnectionFactory在Spring中定义JMS连接来解决此问题 .

相关问题