我是pyspark的新手,我正在尝试用它来处理一个保存为csv文件的大型数据集 . 我想将CSV文件读入spark数据帧,删除一些列,然后添加新列 . 我该怎么做?
我无法将此数据转换为数据帧 . 这是我到目前为止的精简版:
def make_dataframe(data_portion, schema, sql):
fields = data_portion.split(",")
return sql.createDateFrame([(fields[0], fields[1])], schema=schema)
if __name__ == "__main__":
sc = SparkContext(appName="Test")
sql = SQLContext(sc)
...
big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
.reduce(lambda a, b: a.union(b))
big_frame.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<...>") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.mode("append") \
.save()
sc.stop()
这会在reduce步骤中产生错误 TypeError: 'JavaPackage' object is not callable
.
是否有可能做到这一点?减少到数据帧的想法是能够将结果数据写入数据库(Redshift,使用spark-redshift包) .
我也尝试使用 unionAll()
和 map()
与 partial()
但无法使其工作 .
我在Amazon的EMR上使用 spark-redshift_2.10:2.0.0
和Amazon的JDBC驱动程序 RedshiftJDBC41-1.1.17.1017.jar
运行此程序 .
1 回答
更新 - 在评论中回答您的问题:
Read data from CSV to dataframe: 您似乎只尝试将CSV文件读入spark数据帧 .
如果是这样 - 我的答案在这里:https://stackoverflow.com/a/37640154/5088142覆盖这个 .
以下代码应将CSV读入spark-data-frame
drop column
你可以使用"drop(col)" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html删除列
降(COL)
add column 你可以使用"withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
withColumn(colName,col)
注意:spark有很多其他功能可以使用(例如你可以使用“select”而不是“drop”)