首页 文章

计算每个pyspark RDD分区中的元素数

提问于
浏览
3

我正在寻找相当于这个问题的Pyspark:How to get the number of elements in partition? .

具体来说,我想以编程方式计算pyspark RDD或dataframe的每个分区中的元素数量(我知道这些信息在Spark Web UI中可用) .

此尝试导致“AttributeError:'NoneType'对象没有属性'_jvm'”:

df.foreachPartition(lambda iter: sum(1 for _ in iter))

我不想将迭代器的内容收集到内存中 .

1 回答

  • 6

    如果你问:我们可以在没有迭代的情况下获得迭代器中的元素数量吗?答案是No .

    但是我们不必将它存储在内存中,就像你提到的帖子一样:

    def count_in_a_partition(idx, iterator):
      count = 0
      for _ in iterator:
        count += 1
      return idx, count
    
    data = sc.parallelize([
        1, 2, 3, 4
    ], 4)
    
    data.mapPartitionsWithIndex(count_in_a_partition).collect()
    

    EDIT

    请注意,您的代码非常接近解决方案,只需 mapPartitions 需要返回迭代器:

    def count_in_a_partition(iterator):
      yield sum(1 for _ in iterator)
    
    data.mapPartitions(count_in_a_partition).collect()
    

相关问题