首页 文章

Apache Beam GroupByKey()在Python上使用Google DataFlow运行时失败

提问于
浏览
1

我有一个使用Python SDK 2.2.0 for Apache Beam的管道 .

这个管道几乎是一个典型的字数:我有 ("John Doe, Jane Smith", 1) 格式的名字对,我想弄清楚每对名字出现在一起的次数,如下所示:

p_collection
            | "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
            | "GroupByKey" >> beam.GroupByKey()
            | "AggregateGroups" >> beam.Map(lambda (pair, ones): (pair, sum(ones)))
            | "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})

当我在本地运行此代码时,使用一个小数据集,它可以很好地工作 .

但是当我将其部署到Google Cloud DataFlow时,我收到以下错误:

尝试执行工作项时出现异常423109085466017585:Traceback(最近一次调用最后一次):do_work中的文件“/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py”,第582行work_executor.execute()文件“/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py”,第167行,执行op.start()文件“dataflow_worker / shuffle_operations.py”,第49行,在dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start def start(self):文件“dataflow_worker / shuffle_operations.py”,第50行,在dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start中,带有self.scoped_start_state:文件“dataflow_worker / shuffle_operations.py”,行65,在dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start中,self.shuffle_source.reader()作为阅读器:文件“dataflow_worker / shuffle_operations.py”,第69行,在dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start中自我输出(windowed_value)文件“apache_beam /亚军s / worker / operations.py“,第154行,在apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver,self.receivers [output_index]) . receive(windowed_value)文件”apache_beam / runners / worker / operations.py“,第86行,在apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation,consumer).process(windowed_value)文件”dataflow_worker / shuffle_operations.py“,第233行,在dataflow_worker.shuffle_operations中 . BatchGroupAlsoByWindowsOperation.process self.output(wvalue.with_value((k,wvalue.value)))文件“apache_beam / runners / worker / operations.py”,第154行,位于apache_beam.runners.worker.operations.Operation.output cython中 . cast(Receiver,self.receivers [output_index]) . receive(windowed_value)文件“apache_beam / runners / worker / operations.py”,第86行,在apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation,消费者).process(windowed_value)文件“apache_beam / runners / worker / operations.py”,第339行,在apache_beam.runners.worker.operations.D中oOperation.process with self.scoped_process_state:文件“apache_beam / runners / worker / operations.py”,第340行,在apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o)文件“apache_beam / runners / common.py“,第382行,在apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value)文件”apache_beam / runners / common.py“,第390行,在apache_beam.runners.common.DoFnRunner.process self中 . _reraise_augmented(exn)文件“apache_beam / runners / common.py”,第415行,在apache_beam.runners.common.DoFnRunner._reraise_augmented中,在apache_beam.runners.common中提取文件“apache_beam / runners / common.py”,第388行 . DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value)文件“apache_beam / runners / common.py”,第189行,位于apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs(文件“apache_beam / runners / common.py” “,第480行,在apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(w indowed_value)文件“apache_beam / runners / worker / operations.py”,第86行,在apache_beam.runners.worker.operations.ConsumerSet.receive中cython.cast(操作,消费者).process(windowed_value)文件“apache_beam / runners / worker /operations.py“,第339行,在apache_beam.runners.worker.operations.DoOperation.process中使用self.scoped_process_state:文件”apache_beam / runners / worker / operations.py“,第340行,在apache_beam.runners.worker.operations中.DoOperation.process self.dofn_receiver.receive(o)文件“apache_beam / runners / common.py”,第382行,位于apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value)文件“apache_beam / runners / common . py“,第390行,在apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn)文件”apache_beam / runners / common.py“,第431行,在apache_beam.runners.common.DoFnRunner._reraise_augmented中提升new_exn,无,original_traceback文件“apache_beam / runners / common.py”,第388行,位于apache_beam.runners.common.DoFnRunner.processself.do_fn_invoker.invoke_process(windowed_value)文件“apache_beam / runners / common.py”,第189行,位于apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs(文件“apache_beam / runners / common.py”,line 480,在apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(windowed_value)文件“apache_beam / runners / worker / operations.py”,第84行,在apache_beam.runners.worker.operations.ConsumerSet.receive self中 . update_counters_start(windowed_value)文件“apache_beam / runners / worker / operations.py”,第90行,位于apache_beam.runners.worker.operations.ConsumerSet.update_counters_start self.opcounter.update_from(windowed_value)文件“apache_beam / runners / worker / opcounters . py“,第63行,在apache_beam.runners.worker.opcounters.OperationCounters.update_from self.do_sample(windowed_value)文件”apache_beam / runners / worker / opcounters.py“,第81行,在apache_beam.runners.worker.opcounters.OperationCounters中.do_sample self.coder_impl.get_esti mated_size_and_observables(windowed_value))文件“apache_beam / coders / coder_impl.py”,第730行,在apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables中def get_estimated_size_and_observables(self,value,nested = False):文件“apache_beam / coders / coder_impl.py “,第739行,在apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables self._value_coder.get_estimated_size_and_observables(文件”apache_beam / coders / coder_impl.py“,第518行,在apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables values [i]中, nested = nested或i 1 <len(self._coder_impls)))RuntimeError:KeyError:0 [运行'Transform / Format'时]

看看这个错误弹出的地方at the source code,我认为可能是因为某些名字包含一些奇怪的编码字符,所以在绝望的行为中我尝试使用你在代码上看到的 .encode("ascii", errors="ignore").decode() ,但没有运气 .

关于为什么这个管道在本地成功执行但在DataFlow运行器上失败的任何想法?

谢谢!

2 回答

  • 1

    这不是解决我的问题,因为它首先避免了问题,但是由于评论中的user1093967的建议,它使我的代码运行 .

    我刚刚用 CombinePerKey(sum) 步骤替换了 GroupByKeyAggregateGroups ,问题不再发生了 .

    p_collection
            | "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
            | "GroupAndSum" >> beam.CombinePerKey(sum)
            | "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})
    

    不过,我很高兴听到它为什么会奏效 .

  • 2

    在某些情况下,与我自己一样,您需要中间分组值,因此 CombinePerKey 并不理想 . 在这个更一般的情况下,您可以将 GroupByKey() 替换为 CombineValues(ToListCombineFn()) .

    我不确定为什么这个有效,而 GroupByKey 没有 . 我的猜测是消耗 GroupByKey 返回的 _UnwindowedValues iterable像列表一样在并行执行环境中失败 . 我做了类似的事情:

    ... | beam.GroupByKey()
        | beam.Map(lambda k_v: (k_v[0], foo(list(k_v[1]))))
        | ...
    

    其中 foo 需要完整的可索引列表,并且不易组合 . 不过,我不确定为什么这种限制会给你造成问题; sum 可以在可迭代上运行 .

    这个解决方案并不理想(我相信)你会失去一些 ToList 转换的并行化 . 话虽这么说,至少它是一个选项,如果有人面临同样的问题!

相关问题