Python Dask - dataframe.map_partitions()返回值

所以 dask.dataframe.map_partitions() 采用 func 参数和 meta kwarg . 它究竟是如何确定其返回类型的?举个例子:

很多csv在...... \ some_folder中 .

ddf = dd.read_csv(r"...\some_folder\*", usecols=['ColA', 'ColB'], 
                                        blocksize=None, 
                                        dtype={'ColA': np.float32, 'ColB': np.float32})
example_func = lambda x: x.iloc[-1] / len(x)
metaResult = pd.Series({'ColA': .1234, 'ColB': .1234})
result = ddf.map_partitions(example_func, meta=metaResult).compute()

我对“分布式”计算很新,但我会直观地期望这会返回一个Series对象的集合(列表或dict,很可能),但结果是一个Series对象,可以认为是结果的串联每个分区上的example_func . 如果该系列具有指示分区标签的MultiIndex,那么这本身就足够了 .

this questiondocsthe source code itself可以看出,这是因为 ddf.divisions 会因读取csv而返回 (None, None, ..., None) ?是否有一种以dask-native方式执行此操作,或者我是否需要手动进入并断开返回的Series(每个分区上由 example_func 返回的Series的串联)?

另外,我可以随意纠正我的假设/做法,因为我是dask的新手 .

回答(1)

2 years ago

所以dask.dataframe.map_partitions()接受一个func参数和meta kwarg . 它究竟是如何确定其返回类型的?

map_partition 尝试以'intelligent'方式将 func 返回的结果连接到dask DataFrame或dask Series对象 . 此决定基于 func 的返回值:

  • 如果 func 返回标量, map_partitions 将返回一个dask Series对象 .

  • 如果 func 返回pd.Series对象, map_partition 将返回一个dask Series对象,其中连接 func 返回的所有pd.Series对象 .

  • 如果 func 返回pd.DataFrame,则map_partitions返回一个dask Dataframe对象,其中这些pd.DataFrame对象沿第一个轴连接 .

如果您对特殊分区的结果感兴趣,可以使用get_partition() . 如果分区标签通常是您的重要信息,我会考虑在读取csv中的数据后直接分配ddf的单独列,其中包含您需要的所有信息 . 之后,您可以以某种方式构造 func ,它返回一个pd.DataFrame,其中包含您在一列中计算的结果以及在另一列中标识结果所需的信息 .