首页 文章

使用强制架构的Pyspark RDD到DataFrame:值错误

提问于
浏览
3

我正在使用pyspark,其模式与本文末尾显示的模式相当(注意嵌套列表,无序字段),最初从Parquet导入为DataFrame . 从根本上说,我遇到的问题是无法将此数据作为RDD处理,然后转换回DataFrame . (我已经回顾了几个相关的帖子,但我仍然不知道我哪里出错了 . )

通常,下面的代码工作正常(正如人们所期望的那样):

schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
tripDFNew = sqlContext.createDataFrame(tripRDD, schema)
tripDFNew.take(1)

当我需要映射RDD时,事情不起作用(例如,添加字段的情况) .

schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
def trivial_map(row):
    rowDict = row.asDict()
    return pyspark.Row(**rowDict)
tripRDDNew = tripRDD.map(lambda row: trivial_map(row))
tripDFNew = sqlContext.createDataFrame(tripRDDNew, schema)
tripDFNew.take(1)

上面的代码给出了以下异常,其中XXX是整数的替代,它从一次运行变为运行(例如,我见过1,16,23等):

File "/opt/cloudera/parcels/CDH-5.8.3-
1.cdh5.8.3.p1967.2057/lib/spark/python/pyspark/sql/types.py", line 546, in 
toInternal
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple XXX with StructType`

鉴于此信息, is there a clear error in the second block of code ? (我注意到tripRDD属于rdd.RDD类,而tripRDDNew属于rdd.PipelinedRDD类,但我不知道为什么这会出现问题 . )

架构:

root
 |-- foo: struct (nullable = true)
 |    |-- bar_1: integer (nullable = true)
 |    |-- bar_2: integer (nullable = true)
 |    |-- bar_3: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- baz_1: integer (nullable = true)
 |    |    |    |-- baz_2: string (nullable = true)
 |    |    |    |-- baz_3: double (nullable = true)
 |    |-- bar_4: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- baz_1: integer (nullable = true)
 |    |    |    |-- baz_2: string (nullable = true)
 |    |    |    |-- baz_3: double (nullable = true)
 |-- qux: integer (nullable = true)
 |-- corge: integer (nullable = true)
 |-- uier: integer (nullable = true)`

1 回答

  • 2

    如帖子中所述,原始模式的字段不按字母顺序排列 . 这就是问题所在 . 在映射函数中使用.asDict()命令生成的RDD的字段 . tripRDDNew的字段顺序与调用createDataFrame时的模式冲突 . ValueError是尝试将一个整数字段(即示例中的qux,corge或uier)解析为StructType的结果 .

    (顺便说一句:有点令人惊讶的是,createDataFrame要求架构字段与RDD字段具有相同的顺序 . 您应该需要一致的字段名称或一致的字段排序,但要求两者看起来都有点矫枉过正 . )

    (作为第二种情况:DataFrame中非字母字段的存在有些异常 . 例如,sc.parallelize()将在分发数据结构时按字母顺序自动排序字段 . 看起来这些字段应该在导入时排序来自镶木地板文件的DataFrame . 调查为什么不是这种情况可能会很有趣 . )

相关问题