在表格中,有三列:名称,年龄和详细信息 . EX-

|姓名|年龄|信息|
| A | 12 | "{"地址\ ": " add-1 \ _", " pincode \ ":110011}" |
| B | 35 | "{"地址\ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
| C | 36 | "{"地址\ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

该表包含数百万条记录,其中“详细信息”列为序列化的json字符串 . “细节”列的性质本质上是动态的,任何键都可以存在,例如新列,嵌套json .

我想用嵌套json中的字段作为第一类变量创建Java RDD / DataFrame,这样df.printSchema()显示为,

| - name:string(nullable = true)
| - age:long(nullable = true)
| - address:string(nullable = true)
| - state:string(nullable = true)
| - city:string(nullable = true)
| - pincode:long(nullable = true)
| -
:string(nullable = true)

此架构将轻松进行聚合 .

我编写了以下代码,但无法获得上述必需的架构,

DataFrame df = sqlContext.sql("select name, age, details");
    JavaRDD modifiedRDD = df.toJavaRDD().map((Function) row -> {
     Row modifiedRow = null;
     if (row != null) {
        String details = row.getString(2);
        ObjectMapper mapper = new ObjectMapper();
        try {
           UserInfo userInfo = mapper.readValue(details, UserInfo.class);
           Map details = userInfo.getDetails();

           // TODO create modified RDD using default columns and exploded details map such that keys in map are first level columns i.e. userInfo and keys from details map
        } catch (IOException e) {
           e.printStackTrace();
        }
     }
     return modifiedRow;
    });

目标是从原始RDD转换为修改后的RDD,在HDFS中保留,然后进行计算,以便可以在同一组修改的RDD上执行多个聚合 .

请建议 .