首页 文章

Camel:文件消费者组件“咬掉的东西超过它可以咀嚼”,管道因内存不足而死亡

提问于
浏览
0

我在Camel中定义了一个类似于这样的路由:GET请求进来,文件在文件系统中创建 . 文件使用者选择它,从外部Web服务获取数据,并通过POST将结果消息发送到其他Web服务 .

简化代码如下:

// Update request goes on queue:
    from("restlet:http://localhost:9191/update?restletMethod=post")
    .routeId("Update via POST")
    [...some magic that defines a directory and file name based on request headers...]
    .to("file://cameldest/queue?allowNullBody=true&fileExist=Ignore")

    // Update gets processed
    from("file://cameldest/queue?delay=500&recursive=true&maxDepth=2&sortBy=file:parent;file:modified&preMove=inprogress&delete=true")
    .routeId("Update main route")
    .streamCaching() //otherwise stuff can't be sent to multiple endpoints
    [...enrich message from some web service using http4 component...]
    .multicast()
        .stopOnException()
        .to("direct:sendUpdate", "direct:dependencyCheck", "direct:saveXML")
    .end();

多播中的三个 endpoints 只是将生成的消息POST到其他Web服务 .

当队列(即文件目录 cameldest )相当空时,这一切都运行良好 . 文件正在 cameldest/<subdir> 中创建,由文件使用者拾取并移动到 cameldest/<subdir>/inprogress ,并且正在向三个传出的POST发送内容没问题 .

但是,一旦传入的请求堆积到大约300,000个文件进度减慢并最终 pipeline fails due to out-of-memory errors (超出GC开销限制) .

通过增加日志记录,我可以看到文件消费者轮询基本上从不运行,因为它每次都是 appears to take responsibility for all files it sees ,等待它们完成处理,然后才开始另一轮调查 . 除此之外(我假设)造成资源瓶颈,这也会影响我的排序要求:一旦队列被数千条等待处理的消息堵塞,新的消息将被天真地排序得更高 - 如果他们甚至仍然被拿起 - 仍在等待那些已经"started"的人 .

现在,我已经尝试了 maxMessagesPerPolleagerMaxMessagesPerPoll 选项 . 他们似乎一开始就缓解了这个问题,但经过一系列的民意调查后,我仍然在"started" limbo中找到了数千个文件 .

唯一有效的方法是使瓶颈 delaymaxMessages... 如此窄,以至于平均处理速度比文件轮询周期快 .

显然,这不是我想要的 . 我希望我的管道尽可能快地处理文件,但速度不快 . 我希望文件消费者在路由繁忙时等待 .

我犯了一个明显的错误吗?

(我在带有XFS的Redhat 7机器上运行了一个稍微旧的Camel 2.14.0,如果这是问题的一部分 . )

3 回答

  • 0

    尝试将maxMessagesPerPoll设置为来自文件 endpoints 的较低值,以便每次轮询仅获取最多X个文件,这也会限制您在Camel应用程序中将拥有的机上信息总数 .

    您可以在Camel文档中找到有关该选项的更多信息,以获取文件组件

  • 1

    除非你真的需要将数据保存为文件,否则我会提出另一种解决方案 .

    从您的restlet使用者,将每个请求发送到消息排队应用程序,如activemq或rabbitmq或类似的东西 . 您将很快收到该队列上的大量消息,但这没关系 .

    然后用队列使用者替换您的文件使用者 . 这需要一些时间,但每条消息应单独处理并发送到您想要的任何地方 . 我用大约500 000条消息测试了rabbitmq,并且运行正常 . 这也应该减少消费者的负担 .

  • 0

    简短的回答是没有答案:Camel的文件组件的 sortBy 选项对于容纳我的用例来说太缺乏内存效率:

    • 唯一性:我已经在那里了't want to put a file on queue if it' .

    • 优先级:应首先处理标记为高优先级的文件 .

    • 性能:拥有几十万个文件,甚至几百万个文件应该没问题 .

    • FIFO :(奖励)应首先拾取最早的文件(按优先级) .

    问题似乎是,如果我正确读取source codedocumentation,无论是使用内置语言还是自定义可插入 sorter ,所有文件详细信息都在内存中执行排序 . 文件组件总是创建一个包含所有细节的对象列表,这显然会在经常轮询许多文件时导致大量的垃圾收集开销 .

    I got my use case to work, mostly, without having to resort to using a database or writing a custom component, using the following steps:

    • 从父目录 cameldest/queue 上的一个文件使用者移动,该目录以递归方式将子目录中的文件( cameldest/queue/high/ 之前的 cameldest/queue/low/ )排序为 two consumers ,每个目录一个,根本不进行排序 .

    • 仅从 /cameldest/queue/high/ 设置使用者以通过我的实际业务逻辑处理文件 .

    • 将消费者从 /cameldest/queue/low 设置为简单地将文件从"low"提升为"high"(将它们复制过来,即 .to("file://cameldest/queue/high");

    • 至关重要的是,为了 only promote from "low" to "high" when high is not busy ,将 throttles the other route 的路由策略附加到 throttles the other route ,即"low"如果"high"中有任何正在发送的消息

    • 此外,我添加了一个 ThrottlingInflightRoutePolicy 到"high",以防止它一次性过多的交换 .

    Imagine this like at check-in at the airport, where tourist travellers are invited over into the business class lane if that is empty.

    这就像一个负载下的魅力,即使成千上万的文件在“低”队列中排队,直接下降到“高”的新消息(文件)在几秒钟内得到处理 .

    这个解决方案没有涵盖的唯一要求是有序性:没有保证首先拾取旧文件,而不是随机拾取它们 . 可以想象这样一种情况,即稳定的传入文件流可能导致一个特定的文件X总是不吉利而且从未被拾取 . 然而,发生这种情况的可能性非常低 .

    Possible improvement: 目前,允许/暂停文件从"low"升级到"high"的阈值在"high"中设置为0消息 . 一方面,这可以保证在执行"low"的另一次升级之前处理掉入"high"的文件,另一方面它会导致一些停止启动模式,尤其是在多线程场景中 . 虽然不是一个真正的问题,但表现却令人印象深刻 .


    Source:

    我的路线定义:

    ThrottlingInflightRoutePolicy trp = new ThrottlingInflightRoutePolicy();
        trp.setMaxInflightExchanges(50);
    
        SuspendOtherRoutePolicy sorp = new SuspendOtherRoutePolicy("lowPriority");
    
        from("file://cameldest/queue/low?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
        .routeId("lowPriority")
        .log("Copying over to high priority: ${in.headers."+Exchange.FILE_PATH+"}")
        .to("file://cameldest/queue/high");
    
        from("file://cameldest/queue/high?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
        .routeId("highPriority")
        .routePolicy(trp)
        .routePolicy(sorp)
        .threads(20)
        .log("Before: ${in.headers."+Exchange.FILE_PATH+"}")
        .delay(2000) // This is where business logic would happen
        .log("After: ${in.headers."+Exchange.FILE_PATH+"}")
        .stop();
    

    我的 SuspendOtherRoutePolicy ,松散地构建像 ThrottlingInflightRoutePolicy

    public class SuspendOtherRoutePolicy extends RoutePolicySupport implements CamelContextAware {
    
        private CamelContext camelContext;
        private final Lock lock = new ReentrantLock();
        private String otherRouteId;
    
        public SuspendOtherRoutePolicy(String otherRouteId) {
            super();
            this.otherRouteId = otherRouteId;
        }
    
        @Override
        public CamelContext getCamelContext() {
            return camelContext;
        }
    
        @Override
        public void onStart(Route route) {
            super.onStart(route);
            if (camelContext.getRoute(otherRouteId) == null) {
                throw new IllegalArgumentException("There is no route with the id '" + otherRouteId + "'");
            }
        }
    
        @Override
        public void setCamelContext(CamelContext context) {
            camelContext = context;
        }
    
        @Override
        public void onExchangeDone(Route route, Exchange exchange) {
            //log.info("Exchange done on route " + route);
            Route otherRoute = camelContext.getRoute(otherRouteId);
            //log.info("Other route: " + otherRoute);
            throttle(route, otherRoute, exchange);
        }
    
        protected void throttle(Route route, Route otherRoute, Exchange exchange) {
            // this works the best when this logic is executed when the exchange is done
            Consumer consumer = otherRoute.getConsumer();
    
            int size = getSize(route, exchange);
            boolean stop = size > 0;
            if (stop) {
                try {
                    lock.lock();
                    stopConsumer(size, consumer);
                } catch (Exception e) {
                    handleException(e);
                } finally {
                    lock.unlock();
                }
            }
    
            // reload size in case a race condition with too many at once being invoked
            // so we need to ensure that we read the most current size and start the consumer if we are already to low
            size = getSize(route, exchange);
            boolean start = size == 0;
            if (start) {
                try {
                    lock.lock();
                    startConsumer(size, consumer);
                } catch (Exception e) {
                    handleException(e);
                } finally {
                    lock.unlock();
                }
            }
        }
    
        private int getSize(Route route, Exchange exchange) {
            return exchange.getContext().getInflightRepository().size(route.getId());
        }
    
        private void startConsumer(int size, Consumer consumer) throws Exception {
            boolean started = super.startConsumer(consumer);
            if (started) {
                log.info("Resuming the other consumer " + consumer);
            }
        }
    
        private void stopConsumer(int size, Consumer consumer) throws Exception {
            boolean stopped = super.stopConsumer(consumer);
            if (stopped) {
                log.info("Suspending the other consumer " + consumer);
            }
        }
    }
    

相关问题