我在一个 Iterable
Java集合上调用此函数,由 GroupByKey
函数产生:
static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
String COMPLETE_EVENT_NAME = "COMPLETE";
@ProcessElement
public void processElement(ProcessContext c) {
Iterable<Order> orders = c.element().getValue();
boolean complete = false;
do {
try {
Order order = orders.iterator().next();
if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
complete = true;
order.setComplete(complete);
c.output(order);
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
} while (complete == false && orders.iterator().hasNext());
}
}
该函数迭代 Orders
列表并输出与指定的 eventName
属性匹配的第一个实例 . 如果找到 Order
,或者整个集合已经迭代,则循环结束 .
随机 Order
实例在上游生成,并以2 /秒的速率发布到 Pub/Sub 实例,其中它们被调用此函数的 DataFlow 实例使用 . 约 . 进入操作15分钟后,警告开始出现:
处理卡在步骤查找订单至少15m00s而不输出或完成
警告是由于 iterator().hasNext()
或 iterator().next()
中的偶发故障而发出的 . 最终结果是整个管道停滞不前 . 关联的管道阶段永远不会发出输出 .
用标准for循环替换循环解决了这个问题 . 但是,这样做意味着迭代整个集合;我希望在找到合适的元素时结束循环,因此do-while循环 .
我很想知道为什么 iterator
操作导致管道失速 . FAIA Iterable
集合是不可变的,并且未被其他进程修改 .
我在 Windows 上运行 Java 8 和 Apache Beam 2.6 .
1 回答
每次调用
orders.iterator()
时,都会从第一个订单开始创建一个新的迭代器 . 这意味着您在循环中反复处理相同的顺序 . 如果有多个订单,您对hasNext()
的调用将始终为真 . 因此,如果您有多个订单或者您的第一个订单未设置complete
,则循环将永远运行,这就是您达到超时的原因 .相反,你应该调用
iterator()
一次并存储迭代器而不是迭代,使用它来循环: