首页 文章

Pyspark使用ArrayWritable

提问于
浏览
1

我尝试在pyspark上保存键值RDD . RDD的每个单元都有类型,用伪代码编写:

((str,str),(int,[(int,int),...]))`

我想将它保存在hadoop文件系统上 . 为此,我将列表转换为元组并调用 .saveAsSequenceFile . 但是,使用 sc.sequenceFile 重新加载rdd失败, java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.io.ArrayWritable.<init>() .

这是一个尝试保存 (int,int) 的RDD的最小示例 .

import pyspark as spark, math

scConf = spark.SparkConf().setAppName('minimal_example')
sc = spark.SparkContext( conf = scConf )

def divs( x ):
    for n in xrange(1, int(math.sqrt(x))+1 ):
        if x % n == 0: yield n
def constructor( i ):
    return ( i, tuple(divs(i)) )

rdd = sc.parallelize(map(constructor,range(2,61)))
rdd.saveAsSequenceFile("min.seq")

当我使用sc.sequenceFile使用交互式 pyspark 加载它时,它也会失败 . 怎么了?为什么在python中尝试保存数组而我实际上有元组 . 还有如何在pyspark中扩展ArrayWritable以获得默认构造函数?

1 回答

  • 0

    你真的需要序列文件吗?您可以保存AsTextFile,加载文本文件和映射以恢复值 .

    rdd.saveAsTextFile('test') 
    
    sc.textFile('test').map(lambda row: ast.literal_eval(row))
    

相关问题