所以 dask.dataframe.map_partitions()
采用 func
参数和 meta
kwarg . 它究竟是如何确定其返回类型的?举个例子:
很多csv在...... \ some_folder中 .
ddf = dd.read_csv(r"...\some_folder\*", usecols=['ColA', 'ColB'],
blocksize=None,
dtype={'ColA': np.float32, 'ColB': np.float32})
example_func = lambda x: x.iloc[-1] / len(x)
metaResult = pd.Series({'ColA': .1234, 'ColB': .1234})
result = ddf.map_partitions(example_func, meta=metaResult).compute()
我对“分布式”计算很新,但我会直观地期望这会返回一个Series对象的集合(列表或dict,很可能),但结果是一个Series对象,可以认为是结果的串联每个分区上的example_func . 如果该系列具有指示分区标签的MultiIndex,那么这本身就足够了 .
从this question,docs和the source code itself可以看出,这是因为 ddf.divisions
会因读取csv而返回 (None, None, ..., None)
?是否有一种以dask-native方式执行此操作,或者我是否需要手动进入并断开返回的Series(每个分区上由 example_func
返回的Series的串联)?
另外,我可以随意纠正我的假设/做法,因为我是dask的新手 .
1 回答
map_partition
尝试以'intelligent'方式将func
返回的结果连接到dask DataFrame或dask Series对象 . 此决定基于func
的返回值:如果
func
返回标量,map_partitions
将返回一个dask Series对象 .如果
func
返回pd.Series对象,map_partition
将返回一个dask Series对象,其中连接func
返回的所有pd.Series对象 .如果
func
返回pd.DataFrame,则map_partitions返回一个dask Dataframe对象,其中这些pd.DataFrame对象沿第一个轴连接 .如果您对特殊分区的结果感兴趣,可以使用get_partition() . 如果分区标签通常是您的重要信息,我会考虑在读取csv中的数据后直接分配ddf的单独列,其中包含您需要的所有信息 . 之后,您可以以某种方式构造
func
,它返回一个pd.DataFrame,其中包含您在一列中计算的结果以及在另一列中标识结果所需的信息 .