首页 文章

获取CSV到Spark数据帧

提问于
浏览 573
16

我在Spark上使用python,并希望将csv放入数据帧 .

Spark SQL的documentation奇怪地没有提供CSV作为源的解释 .

我找到了Spark-CSV,但是我对文档的两个部分有问题:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3" 每次启动pyspark或spark-submit时,我真的需要添加这个参数吗?它似乎非常不优雅 . 有没有办法在python中导入它而不是每次重新加载它?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv") 即使我这样做,这也行不通 . "source"参数在这行代码中代表什么?我如何简单地在linux上加载本地文件,比如"/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?

8 回答

  • 21
    from pyspark.sql.types import StringType
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)
    
    Employee_rdd = sc.textFile("\..\Employee.csv")
                   .map(lambda line: line.split(","))
    
    Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])
    
    Employee_df.show()
    
  • 16

    如果您不介意额外的包依赖项,可以使用Pandas来解析CSV文件 . 它处理内部逗号就好了 .

    依赖关系:

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    import pandas as pd
    

    立即将整个文件读入Spark DataFrame:

    sc = SparkContext('local','example')  # if using locally
    sql_sc = SQLContext(sc)
    
    pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
    # If no header:
    # pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
    s_df = sql_sc.createDataFrame(pandas_df)
    

    或者,更有数据意识的是,您可以将数据块化为Spark RDD然后DF:

    chunk_100k = pd.read_csv('file.csv', chunksize=100000)
    
    for chunky in chunk_100k:
        Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
        try:
            Spark_full_rdd += Spark_temp_rdd
        except NameError:
            Spark_full_rdd = Spark_temp_rdd
        del Spark_temp_rdd
    
    Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])
    
  • 12

    将csv文件读入RDD,然后从原始RDD生成RowRDD .

    创建由与步骤1中创建的RDD中的行结构匹配的StructType表示的模式 .

    通过SQLContext提供的createDataFrame方法将模式应用于行的RDD .

    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    # Each line is converted to a tuple.
    people = parts.map(lambda p: (p[0], p[1].strip()))
    
    # The schema is encoded in a string.
    schemaString = "name age"
    
    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)
    
    # Apply the schema to the RDD.
    schemaPeople = spark.createDataFrame(people, schema)
    

    来源:SPARK PROGRAMMING GUIDE

  • 10

    随着更新版本的Spark(我相信,1.4),这变得更加容易 . 表达式 sqlContext.read 为您提供DataFrameReader实例,其中包含 .csv() 方法:

    df = sqlContext.read.csv("/path/to/your.csv")
    

    请注意,您还可以通过将关键字参数 header=True 添加到 .csv() 调用来指示csv文件具有标头 . 还有一些其他选项可供使用,并在上面的链接中进行了描述 .

  • 6

    for Pyspark, assuming that the first row of the csv file contains a header

    spark = SparkSession.builder.appName('chosenName').getOrCreate()
    df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)
    
  • 3

    我遇到了类似的问题 . 解决方案是添加一个名为“PYSPARK_SUBMIT_ARGS”的环境变量,并将其值设置为“--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell” . 这适用于Spark的Python交互式shell .

    确保将spark-csv的版本与安装的Scala版本相匹配 . 使用Scala 2.11,它是spark-csv_2.11,使用Scala 2.10或2.10.5,它是spark-csv_2.10 .

    希望它有效 .

  • 0

    遵循Spark 2.0,建议使用Spark会话:

    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    
    # Create a SparkSession
    spark = SparkSession \
        .builder \
        .appName("basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    def mapper(line):
        fields = line.split(',')
        return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3]))
    
    lines = spark.sparkContext.textFile("file.csv")
    df = lines.map(mapper)
    
    # Infer the schema, and register the DataFrame as a table.
    schemaDf = spark.createDataFrame(df).cache()
    schemaDf.createOrReplaceTempView("tablename")
    
  • 0

    基于Aravind的答案,但更短,例如:

    lines = sc.textFile("/path/to/file").map(lambda x: x.split(","))
    df = lines.toDF(["year", "month", "day", "count"])
    

相关问题