首页 文章

为什么可迭代操作会在Apache Beam函数中抛出错误?

提问于
浏览
0

我在一个 Iterable Java集合上调用此函数,由 GroupByKey 函数产生:

static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
    String COMPLETE_EVENT_NAME = "COMPLETE";

    @ProcessElement
    public void processElement(ProcessContext c) {
        Iterable<Order> orders = c.element().getValue();
        boolean complete = false;

        do {
            try {
                Order order = orders.iterator().next();

                if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
                    complete = true;
                    order.setComplete(complete);
                    c.output(order);
                }
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
        } while (complete == false && orders.iterator().hasNext());
    }
}

该函数迭代 Orders 列表并输出与指定的 eventName 属性匹配的第一个实例 . 如果找到 Order ,或者整个集合已经迭代,则循环结束 .

随机 Order 实例在上游生成,并以2 /秒的速率发布到 Pub/Sub 实例,其中它们被调用此函数的 DataFlow 实例使用 . 约 . 进入操作15分钟后,警告开始出现:

处理卡在步骤查找订单至少15m00s而不输出或完成

警告是由于 iterator().hasNext()iterator().next() 中的偶发故障而发出的 . 最终结果是整个管道停滞不前 . 关联的管道阶段永远不会发出输出 .

用标准for循环替换循环解决了这个问题 . 但是,这样做意味着迭代整个集合;我希望在找到合适的元素时结束循环,因此do-while循环 .

我很想知道为什么 iterator 操作导致管道失速 . FAIA Iterable 集合是不可变的,并且未被其他进程修改 .

我在 Windows 上运行 Java 8Apache Beam 2.6 .

1 回答

  • 1

    每次调用 orders.iterator() 时,都会从第一个订单开始创建一个新的迭代器 . 这意味着您在循环中反复处理相同的顺序 . 如果有多个订单,您对 hasNext() 的调用将始终为真 . 因此,如果您有多个订单或者您的第一个订单未设置 complete ,则循环将永远运行,这就是您达到超时的原因 .

    相反,你应该调用 iterator() 一次并存储迭代器而不是迭代,使用它来循环:

    static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
        String COMPLETE_EVENT_NAME = "COMPLETE";
    
        @ProcessElement
        public void processElement(ProcessContext c) {
            Iterator<Order> orders = c.element().getValue().iterator();
            boolean complete = false;
    
            do {
                try {
                    Order order = orders.next();
    
                    if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
                        complete = true;
                        order.setComplete(complete);
                        c.output(order);
                    }
                } catch (Exception e) {
                    LOG.error(e.getMessage());
                }
            } while (complete == false && orders.hasNext());
        }
    }
    

相关问题