我在Camel中定义了一个类似于这样的路由:GET请求进来,文件在文件系统中创建 . 文件使用者选择它,从外部Web服务获取数据,并通过POST将结果消息发送到其他Web服务 .
简化代码如下:
// Update request goes on queue:
from("restlet:http://localhost:9191/update?restletMethod=post")
.routeId("Update via POST")
[...some magic that defines a directory and file name based on request headers...]
.to("file://cameldest/queue?allowNullBody=true&fileExist=Ignore")
// Update gets processed
from("file://cameldest/queue?delay=500&recursive=true&maxDepth=2&sortBy=file:parent;file:modified&preMove=inprogress&delete=true")
.routeId("Update main route")
.streamCaching() //otherwise stuff can't be sent to multiple endpoints
[...enrich message from some web service using http4 component...]
.multicast()
.stopOnException()
.to("direct:sendUpdate", "direct:dependencyCheck", "direct:saveXML")
.end();
多播中的三个 endpoints 只是将生成的消息POST到其他Web服务 .
当队列(即文件目录 cameldest
)相当空时,这一切都运行良好 . 文件正在 cameldest/<subdir>
中创建,由文件使用者拾取并移动到 cameldest/<subdir>/inprogress
,并且正在向三个传出的POST发送内容没问题 .
但是,一旦传入的请求堆积到大约300,000个文件进度减慢并最终 pipeline fails due to out-of-memory errors (超出GC开销限制) .
通过增加日志记录,我可以看到文件消费者轮询基本上从不运行,因为它每次都是 appears to take responsibility for all files it sees ,等待它们完成处理,然后才开始另一轮调查 . 除此之外(我假设)造成资源瓶颈,这也会影响我的排序要求:一旦队列被数千条等待处理的消息堵塞,新的消息将被天真地排序得更高 - 如果他们甚至仍然被拿起 - 仍在等待那些已经"started"的人 .
现在,我已经尝试了 maxMessagesPerPoll
和 eagerMaxMessagesPerPoll
选项 . 他们似乎一开始就缓解了这个问题,但经过一系列的民意调查后,我仍然在"started" limbo中找到了数千个文件 .
唯一有效的方法是使瓶颈 delay
和 maxMessages...
如此窄,以至于平均处理速度比文件轮询周期快 .
显然,这不是我想要的 . 我希望我的管道尽可能快地处理文件,但速度不快 . 我希望文件消费者在路由繁忙时等待 .
我犯了一个明显的错误吗?
(我在带有XFS的Redhat 7机器上运行了一个稍微旧的Camel 2.14.0,如果这是问题的一部分 . )
3 回答
尝试将maxMessagesPerPoll设置为来自文件 endpoints 的较低值,以便每次轮询仅获取最多X个文件,这也会限制您在Camel应用程序中将拥有的机上信息总数 .
您可以在Camel文档中找到有关该选项的更多信息,以获取文件组件
除非你真的需要将数据保存为文件,否则我会提出另一种解决方案 .
从您的restlet使用者,将每个请求发送到消息排队应用程序,如activemq或rabbitmq或类似的东西 . 您将很快收到该队列上的大量消息,但这没关系 .
然后用队列使用者替换您的文件使用者 . 这需要一些时间,但每条消息应单独处理并发送到您想要的任何地方 . 我用大约500 000条消息测试了rabbitmq,并且运行正常 . 这也应该减少消费者的负担 .
简短的回答是没有答案:Camel的文件组件的
sortBy
选项对于容纳我的用例来说太缺乏内存效率:唯一性:我已经在那里了't want to put a file on queue if it' .
优先级:应首先处理标记为高优先级的文件 .
性能:拥有几十万个文件,甚至几百万个文件应该没问题 .
FIFO :(奖励)应首先拾取最早的文件(按优先级) .
问题似乎是,如果我正确读取source code和documentation,无论是使用内置语言还是自定义可插入
sorter
,所有文件详细信息都在内存中执行排序 . 文件组件总是创建一个包含所有细节的对象列表,这显然会在经常轮询许多文件时导致大量的垃圾收集开销 .I got my use case to work, mostly, without having to resort to using a database or writing a custom component, using the following steps:
从父目录
cameldest/queue
上的一个文件使用者移动,该目录以递归方式将子目录中的文件(cameldest/queue/high/
之前的cameldest/queue/low/
)排序为 two consumers ,每个目录一个,根本不进行排序 .仅从
/cameldest/queue/high/
设置使用者以通过我的实际业务逻辑处理文件 .将消费者从
/cameldest/queue/low
设置为简单地将文件从"low"提升为"high"(将它们复制过来,即.to("file://cameldest/queue/high");
)至关重要的是,为了 only promote from "low" to "high" when high is not busy ,将 throttles the other route 的路由策略附加到 throttles the other route ,即"low"如果"high"中有任何正在发送的消息
此外,我添加了一个
ThrottlingInflightRoutePolicy
到"high",以防止它一次性过多的交换 .Imagine this like at check-in at the airport, where tourist travellers are invited over into the business class lane if that is empty.
这就像一个负载下的魅力,即使成千上万的文件在“低”队列中排队,直接下降到“高”的新消息(文件)在几秒钟内得到处理 .
这个解决方案没有涵盖的唯一要求是有序性:没有保证首先拾取旧文件,而不是随机拾取它们 . 可以想象这样一种情况,即稳定的传入文件流可能导致一个特定的文件X总是不吉利而且从未被拾取 . 然而,发生这种情况的可能性非常低 .
Possible improvement: 目前,允许/暂停文件从"low"升级到"high"的阈值在"high"中设置为0消息 . 一方面,这可以保证在执行"low"的另一次升级之前处理掉入"high"的文件,另一方面它会导致一些停止启动模式,尤其是在多线程场景中 . 虽然不是一个真正的问题,但表现却令人印象深刻 .
Source:
我的路线定义:
我的
SuspendOtherRoutePolicy
,松散地构建像ThrottlingInflightRoutePolicy