首页 文章

如何从 PySpark 中的字符串获取列表

提问于
浏览
1

在 PySpark 中是否有类似eval的功能。

我正在尝试将 Python 代码转换为 PySpark

我正在查询一个数据框,并且其中一列具有数据,如下所示,但采用字符串格式

[{u'date': u'2015-02-08', u'by': u'abc@gg.com', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'dfg@yaa.com', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'wrwe@hot.com', u'value': u'ufc'}]

假设“ x”是在数据框中保存此值的列。

现在,我想传递该字符串列“ x”并获取列表,以便可以将其传递给 mapPartition 函数。

我想避免迭代驱动程序上的每一行,这就是我这样想的原因。

在 Python 中使用 eval()函数(如果已使用):我得到以下输出:

x = "[{u'date': u'2015-02-08', u'by': u'abc@gg.com', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'dfg@yaa.com', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'wrwe@hot.com', u'value': u'ufc'}]"

list = eval(x)

for i in list:  print i

输出:(这也是我在 PySpark 中想要的)

{u'date': u'2015-02-08', u'by': u'abc@gg.com', u'value': u'NA'}
{u'date': u'2016-02-08', u'by': u'dfg@yaa.com', u'value': u'applicable'}
{u'date': u'2017-02-08', u'by': u'wrwe@hot.com', u'value': u'ufc'}

如何在 PySpark 中做到这一点?

1 回答

  • 1

    您可以通过使用from_json函数将 json 字符串转换为实际 json 来受益。为此,您必须定义一个与您的 json 字符串匹配的schema。最后,像使用eval一样,使用explode函数将结构数组分隔到不同的行。

    如果您有数据

    x = "[{u'date': u'2015-02-08', u'by': u'abc@gg.com', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'dfg@yaa.com', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'wrwe@hot.com', u'value': u'ufc'}]"
    

    然后创建dataframe

    df = sqlContext.createDataFrame([(x,),], ["x"])
    
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |x                                                                                                                                                                                                              |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |[{u'date': u'2015-02-08', u'by': u'abc@gg.com', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'dfg@yaa.com', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'wrwe@hot.com', u'value': u'ufc'}]|
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
    root
     |-- x: string (nullable = true)
    

    使用 json

    正如我所解释的,您需要使用schemaregexp_replace函数,from_json函数和explode函数作为

    from pyspark.sql import types as T
    schema = T.ArrayType(T.StructType([T.StructField('date', T.StringType()), T.StructField('by', T.StringType()), T.StructField('value', T.StringType())]))
    
    from pyspark.sql import functions as F
    df = df.withColumn("x", F.explode(F.from_json(F.regexp_replace(df['x'], "(u')", "'"), schema=schema)))
    

    这应该给你

    +-----------------------------------+
    |x                                  |
    +-----------------------------------+
    |[2015-02-08,abc@gg.com,NA]         |
    |[2016-02-08,dfg@yaa.com,applicable]|
    |[2017-02-08,wrwe@hot.com,ufc]      |
    +-----------------------------------+
    
    root
     |-- x: struct (nullable = true)
     |    |-- date: string (nullable = true)
     |    |-- by: string (nullable = true)
     |    |-- value: string (nullable = true)
    

    如果您需要问题中提到的 json 字符串,则可以使用to_json函数作为

    df = df.withColumn("x", F.to_json(df['x']))
    

    这会给你

    +-------------------------------------------------------------+
    |x                                                            |
    +-------------------------------------------------------------+
    |{"date":"2015-02-08","by":"abc@gg.com","value":"NA"}         |
    |{"date":"2016-02-08","by":"dfg@yaa.com","value":"applicable"}|
    |{"date":"2017-02-08","by":"wrwe@hot.com","value":"ufc"}      |
    +-------------------------------------------------------------+
    

    仅使用字符串

    如果您不想遍历 json 的所有复杂性,则可以简单地使用字符串。为此,您需要嵌套regex_replacesplitexplode函数作为

    from pyspark.sql import functions as F
    df = df.withColumn("x", F.explode(F.split(F.regexp_replace(F.regexp_replace(F.regexp_replace(df['x'], "(u')", "'"), "[\\[\\]\s]", ""), "},\\{", "};&;{"), ";&;")))
    

    这应该给你

    +-------------------------------------------------------------+
    |x                                                            |
    +-------------------------------------------------------------+
    |{'date':'2015-02-08','by':'abc@gg.com','value':'NA'}         |
    |{'date':'2016-02-08','by':'dfg@yaa.com','value':'applicable'}|
    |{'date':'2017-02-08','by':'wrwe@hot.com','value':'ufc'}      |
    +-------------------------------------------------------------+
    

相关问题