首页 文章

根据某些键值(pyspark)从RDD创建多个Spark DataFrames

提问于
浏览
2

我有一些包含JSON对象的文本文件(每行一个对象) . 例:

{"a": 1, "b": 2, "table": "foo"}
{"c": 3, "d": 4, "table": "bar"}
{"a": 5, "b": 6, "table": "foo"}
...

我想根据表名将文本文件的内容解析为Spark DataFrames . 因此,在上面的示例中,我将为“foo”提供DataFrame,为“bar”提供另一个DataFrame . 我已经做到了将JSON的行分组到RDD内的列表中,并使用以下(pyspark)代码:

text_rdd = sc.textFile(os.path.join("/path/to/data", "*"))
tables_rdd = text_rdd.groupBy(lambda x: json.loads(x)['table'])

这将生成一个RDD,其中包含具有以下结构的元组列表:

RDD[("foo", ['{"a": 1, "b": 2, "table": "foo"}', ...],
    ("bar", ['{"c": 3, "d": 4, "table": "bar"}', ...]]

如何将此RDD分解为每个表键的DataFrame?

编辑:我试图澄清一下,单个文件中有多行包含表的信息 . 我知道我可以在我创建的“groupBy”RDD上调用.collectAsMap,但我知道这会在我的驱动程序上消耗相当大量的RAM . 我的问题是:有没有办法在不使用.collectAsMap的情况下将“groupBy”RDD分解为多个DataFrame?

2 回答

  • 0

    您可以将其有效地拆分为镶木地板分区:首先我们将其转换为数据框:

    text_rdd = sc.textFile(os.path.join("/path/to/data", "*"))
    df = spark.read.json(text_rdd)
    df.printSchema()
        root
         |-- a: long (nullable = true)
         |-- b: long (nullable = true)
         |-- c: long (nullable = true)
         |-- d: long (nullable = true)
         |-- table: string (nullable = true)
    

    现在我们可以写出来:

    df.write.partitionBy('table').parquet([output directory name])
    

    如果列出 [output directory name] 的内容,您将看到与 table 的不同值一样多的分区:

    hadoop fs -ls [output directory name]
    
        _SUCCESS
        table=bar/
        table=foo/
    

    如果您只想保留每个表的列,则可以执行此操作(假设每当表出现在文件中时都会显示完整的列列表)

    import ast
    from pyspark.sql import Row
    table_cols = spark.createDataFrame(text_rdd.map(lambda l: ast.literal_eval(l)).map(lambda l: Row(
            table = l["table"], 
            keys = sorted(l.keys())
        ))).distinct().toPandas()
    table_cols = table_cols.set_index("table")
    table_cols.to_dict()["keys"]
    
        {u'bar': [u'c', u'd', u'table'], u'foo': [u'a', u'b', u'table']}
    
  • 3

    以下是步骤:

    • 将每个文本字符串映射到json .
    jsonRdd = sc.textFile(os.path.join("/path/to/data", "*")).map (.....)
    
    • 获取驱动程序的所有不同表名 .
    tables = jsonRdd.map(<extract table name only from json object >).distinct().collect()
    
    • 遍历每个(步骤2)表并过滤主jsonRdd以创建单个表的rdd .
    tablesRDD=[]
    for table in tables:
         # categorize each main rdd record based on table name.
         # Compare each json object table element with for loop table string and on successful match return true.
        output.append(jasonRdd.filter(lambda jsonObj: jsonObj['table'] == table))
    

    我不是python开发人员所以确切的代码片段可能无法正常工作 .

相关问题