我在Spark上使用python,并希望将csv放入数据帧 .
Spark SQL的documentation奇怪地没有提供CSV作为源的解释 .
我找到了Spark-CSV,但是我对文档的两个部分有问题:
-
"This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3"
每次启动pyspark或spark-submit时,我真的需要添加这个参数吗?它似乎非常不优雅 . 有没有办法在python中导入它而不是每次重新加载它? -
df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
即使我这样做,这也行不通 . "source"参数在这行代码中代表什么?我如何简单地在linux上加载本地文件,比如"/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?
8 回答
将csv文件读入RDD,然后从原始RDD生成RowRDD .
创建由与步骤1中创建的RDD中的行结构匹配的StructType表示的模式 .
通过SQLContext提供的createDataFrame方法将模式应用于行的RDD .
来源:SPARK PROGRAMMING GUIDE
随着更新版本的Spark(我相信,1.4),这变得更加容易 . 表达式
sqlContext.read
为您提供DataFrameReader实例,其中包含.csv()
方法:请注意,您还可以通过将关键字参数
header=True
添加到.csv()
调用来指示csv文件具有标头 . 还有一些其他选项可供使用,并在上面的链接中进行了描述 .如果您不介意额外的包依赖项,可以使用Pandas来解析CSV文件 . 它处理内部逗号就好了 .
依赖关系:
立即将整个文件读入Spark DataFrame:
或者,更有数据意识的是,您可以将数据块化为Spark RDD然后DF:
遵循Spark 2.0,建议使用Spark会话:
for Pyspark, assuming that the first row of the csv file contains a header
我遇到了类似的问题 . 解决方案是添加一个名为“PYSPARK_SUBMIT_ARGS”的环境变量,并将其值设置为“--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell” . 这适用于Spark的Python交互式shell .
确保将spark-csv的版本与安装的Scala版本相匹配 . 使用Scala 2.11,它是spark-csv_2.11,使用Scala 2.10或2.10.5,它是spark-csv_2.10 .
希望它有效 .
基于Aravind的答案,但更短,例如: