首页 文章

如何遍历pyspark中的每一行dataFrame

提问于
浏览
22

例如

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

上面的语句在终端上打印整个表,但我想使用 for or while 访问该表中的每一行以执行进一步的计算 .

5 回答

  • 20

    你根本做不到 . DataFrames ,与其他分布式数据结构相同,不是iterable,只能使用专用的高阶函数和/或SQL方法进行访问 .

    你当然可以 collect

    for row in df.rdd.collect():
        do_something(row)
    

    或转换 toLocalIterator

    for row in df.rdd.toLocalIterator():
        do_something(row)
    

    并如上所示在本地迭代,但它胜过使用Spark的所有目的 .

  • 2

    如果要对DataFrame对象中的每一行执行某些操作,请使用 map . 这将允许您对每一行执行进一步的计算 . 它相当于从 0len(dataset)-1 循环整个数据集 .

    请注意,这将返回PipelinedRDD,而不是DataFrame .

  • 5

    以上

    tupleList = [{name:x["name"], age:x["age"], city:x["city"]}
    

    应该

    tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]}
    

    对于 nameagecity 不是变量,而只是字典的键 .

  • 29

    使用python中的列表推导,您可以使用两行将整列值收集到列表中:

    df = sqlContext.sql("show tables in default")
    tableList = [x["tableName"] for x in df.rdd.collect()]
    

    在上面的示例中,我们返回数据库'default'中的表列表,但可以通过替换sql()中使用的查询来调整相同的表 .

    或者更简略:

    tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]
    

    对于三列的示例,我们可以创建一个字典列表,然后在for循环中迭代它们 .

    sql_text = "select name, age, city from user"
    tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
                 for x in sqlContext.sql(sql_text).rdd.collect()]
    for row in tupleList:
        print("{} is a {} year old from {}".format(
            row["name"],
            row["age"],
            row["city"]))
    
  • 1

    您将定义自定义函数并使用映射 .

    def customFunction(row):
    
       return (row.name, row.age, row.city)
    
    sample2 = sample.rdd.map(customFunction)
    

    要么

    sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))
    

    然后,自定义函数将应用于数据帧的每一行 . 请注意,sample2将是 RDD ,而不是数据帧 .

    如果要执行更复杂的计算,则需要映射 . 如果您只需要添加派生列,则可以使用 withColumn ,并返回数据帧 .

    sample3 = sample.withColumn('age2', sample.age + 2)
    

相关问题