我有一个使用Python SDK 2.2.0 for Apache Beam的管道 .
这个管道几乎是一个典型的字数:我有 ("John Doe, Jane Smith", 1)
格式的名字对,我想弄清楚每对名字出现在一起的次数,如下所示:
p_collection
| "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
| "GroupByKey" >> beam.GroupByKey()
| "AggregateGroups" >> beam.Map(lambda (pair, ones): (pair, sum(ones)))
| "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})
当我在本地运行此代码时,使用一个小数据集,它可以很好地工作 .
但是当我将其部署到Google Cloud DataFlow时,我收到以下错误:
尝试执行工作项时出现异常423109085466017585:Traceback(最近一次调用最后一次):do_work中的文件“/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py”,第582行work_executor.execute()文件“/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py”,第167行,执行op.start()文件“dataflow_worker / shuffle_operations.py”,第49行,在dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start def start(self):文件“dataflow_worker / shuffle_operations.py”,第50行,在dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start中,带有self.scoped_start_state:文件“dataflow_worker / shuffle_operations.py”,行65,在dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start中,self.shuffle_source.reader()作为阅读器:文件“dataflow_worker / shuffle_operations.py”,第69行,在dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start中自我输出(windowed_value)文件“apache_beam /亚军s / worker / operations.py“,第154行,在apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver,self.receivers [output_index]) . receive(windowed_value)文件”apache_beam / runners / worker / operations.py“,第86行,在apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation,consumer).process(windowed_value)文件”dataflow_worker / shuffle_operations.py“,第233行,在dataflow_worker.shuffle_operations中 . BatchGroupAlsoByWindowsOperation.process self.output(wvalue.with_value((k,wvalue.value)))文件“apache_beam / runners / worker / operations.py”,第154行,位于apache_beam.runners.worker.operations.Operation.output cython中 . cast(Receiver,self.receivers [output_index]) . receive(windowed_value)文件“apache_beam / runners / worker / operations.py”,第86行,在apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation,消费者).process(windowed_value)文件“apache_beam / runners / worker / operations.py”,第339行,在apache_beam.runners.worker.operations.D中oOperation.process with self.scoped_process_state:文件“apache_beam / runners / worker / operations.py”,第340行,在apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o)文件“apache_beam / runners / common.py“,第382行,在apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value)文件”apache_beam / runners / common.py“,第390行,在apache_beam.runners.common.DoFnRunner.process self中 . _reraise_augmented(exn)文件“apache_beam / runners / common.py”,第415行,在apache_beam.runners.common.DoFnRunner._reraise_augmented中,在apache_beam.runners.common中提取文件“apache_beam / runners / common.py”,第388行 . DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value)文件“apache_beam / runners / common.py”,第189行,位于apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs(文件“apache_beam / runners / common.py” “,第480行,在apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(w indowed_value)文件“apache_beam / runners / worker / operations.py”,第86行,在apache_beam.runners.worker.operations.ConsumerSet.receive中cython.cast(操作,消费者).process(windowed_value)文件“apache_beam / runners / worker /operations.py“,第339行,在apache_beam.runners.worker.operations.DoOperation.process中使用self.scoped_process_state:文件”apache_beam / runners / worker / operations.py“,第340行,在apache_beam.runners.worker.operations中.DoOperation.process self.dofn_receiver.receive(o)文件“apache_beam / runners / common.py”,第382行,位于apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value)文件“apache_beam / runners / common . py“,第390行,在apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn)文件”apache_beam / runners / common.py“,第431行,在apache_beam.runners.common.DoFnRunner._reraise_augmented中提升new_exn,无,original_traceback文件“apache_beam / runners / common.py”,第388行,位于apache_beam.runners.common.DoFnRunner.processself.do_fn_invoker.invoke_process(windowed_value)文件“apache_beam / runners / common.py”,第189行,位于apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs(文件“apache_beam / runners / common.py”,line 480,在apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(windowed_value)文件“apache_beam / runners / worker / operations.py”,第84行,在apache_beam.runners.worker.operations.ConsumerSet.receive self中 . update_counters_start(windowed_value)文件“apache_beam / runners / worker / operations.py”,第90行,位于apache_beam.runners.worker.operations.ConsumerSet.update_counters_start self.opcounter.update_from(windowed_value)文件“apache_beam / runners / worker / opcounters . py“,第63行,在apache_beam.runners.worker.opcounters.OperationCounters.update_from self.do_sample(windowed_value)文件”apache_beam / runners / worker / opcounters.py“,第81行,在apache_beam.runners.worker.opcounters.OperationCounters中.do_sample self.coder_impl.get_esti mated_size_and_observables(windowed_value))文件“apache_beam / coders / coder_impl.py”,第730行,在apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables中def get_estimated_size_and_observables(self,value,nested = False):文件“apache_beam / coders / coder_impl.py “,第739行,在apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables self._value_coder.get_estimated_size_and_observables(文件”apache_beam / coders / coder_impl.py“,第518行,在apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables values [i]中, nested = nested或i 1 <len(self._coder_impls)))RuntimeError:KeyError:0 [运行'Transform / Format'时]
看看这个错误弹出的地方at the source code,我认为可能是因为某些名字包含一些奇怪的编码字符,所以在绝望的行为中我尝试使用你在代码上看到的 .encode("ascii", errors="ignore").decode()
,但没有运气 .
关于为什么这个管道在本地成功执行但在DataFlow运行器上失败的任何想法?
谢谢!
2 回答
这不是解决我的问题,因为它首先避免了问题,但是由于评论中的user1093967的建议,它使我的代码运行 .
我刚刚用
CombinePerKey(sum)
步骤替换了GroupByKey
和AggregateGroups
,问题不再发生了 .不过,我很高兴听到它为什么会奏效 .
在某些情况下,与我自己一样,您需要中间分组值,因此
CombinePerKey
并不理想 . 在这个更一般的情况下,您可以将GroupByKey()
替换为CombineValues(ToListCombineFn())
.我不确定为什么这个有效,而
GroupByKey
没有 . 我的猜测是消耗GroupByKey
返回的_UnwindowedValues
iterable像列表一样在并行执行环境中失败 . 我做了类似的事情:其中
foo
需要完整的可索引列表,并且不易组合 . 不过,我不确定为什么这种限制会给你造成问题;sum
可以在可迭代上运行 .这个解决方案并不理想(我相信)你会失去一些
ToList
转换的并行化 . 话虽这么说,至少它是一个选项,如果有人面临同样的问题!