首页 文章

PySpark如何将CSV读入Dataframe并进行操作

提问于
浏览
5

我是pyspark的新手,我正在尝试用它来处理一个保存为csv文件的大型数据集 . 我想将CSV文件读入spark数据帧,删除一些列,然后添加新列 . 我该怎么做?

我无法将此数据转换为数据帧 . 这是我到目前为止的精简版:

def make_dataframe(data_portion, schema, sql):
    fields = data_portion.split(",")
    return sql.createDateFrame([(fields[0], fields[1])], schema=schema)

if __name__ == "__main__":
    sc = SparkContext(appName="Test")
    sql = SQLContext(sc)

    ...

    big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
                .reduce(lambda a, b: a.union(b))

    big_frame.write \
        .format("com.databricks.spark.redshift") \
        .option("url", "jdbc:redshift://<...>") \
        .option("dbtable", "my_table_copy") \
        .option("tempdir", "s3n://path/for/temp/data") \
        .mode("append") \
        .save()

    sc.stop()

这会在reduce步骤中产生错误 TypeError: 'JavaPackage' object is not callable .

是否有可能做到这一点?减少到数据帧的想法是能够将结果数据写入数据库(Redshift,使用spark-redshift包) .

我也尝试使用 unionAll()map()partial() 但无法使其工作 .

我在Amazon的EMR上使用 spark-redshift_2.10:2.0.0 和Amazon的JDBC驱动程序 RedshiftJDBC41-1.1.17.1017.jar 运行此程序 .

1 回答

  • 8

    更新 - 在评论中回答您的问题:

    Read data from CSV to dataframe: 您似乎只尝试将CSV文件读入spark数据帧 .

    如果是这样 - 我的答案在这里:https://stackoverflow.com/a/37640154/5088142覆盖这个 .

    以下代码应将CSV读入spark-data-frame

    import pyspark
    sc = pyspark.SparkContext()
    sql = SQLContext(sc)
    
    df = (sql.read
             .format("com.databricks.spark.csv")
             .option("header", "true")
             .load("/path/to_csv.csv"))
    
    // these lines are equivalent in Spark 2.0 - using [SparkSession][1]
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    spark.read.format("csv").option("header", "true").load("/path/to_csv.csv") 
    spark.read.option("header", "true").csv("/path/to_csv.csv")
    

    drop column

    你可以使用"drop(col)" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html删除列

    降(COL)

    Returns a new DataFrame that drops the specified column.
    Parameters: col – a string name of the column to drop, or a Column to drop.
    
    >>> df.drop('age').collect()
    [Row(name=u'Alice'), Row(name=u'Bob')]
    
    >>> df.drop(df.age).collect()
    [Row(name=u'Alice'), Row(name=u'Bob')]
    
    >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
    [Row(age=5, height=85, name=u'Bob')]
    
    >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
    [Row(age=5, name=u'Bob', height=85)]
    

    add column 你可以使用"withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

    withColumn(colName,col)

    Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
    Parameters: 
    
        colName – string, name of the new column.
        col – a Column expression for the new column.
    
    >>> df.withColumn('age2', df.age + 2).collect()
    [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
    

    注意:spark有很多其他功能可以使用(例如你可以使用“select”而不是“drop”)

相关问题