在表格中,有三列:名称,年龄和详细信息 . EX-
|姓名|年龄|信息|
| A | 12 | "{"地址\ ": " add-1 \ _", " pincode \ ":110011}" |
| B | 35 | "{"地址\ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
| C | 36 | "{"地址\ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
该表包含数百万条记录,其中“详细信息”列为序列化的json字符串 . “细节”列的性质本质上是动态的,任何键都可以存在,例如新列,嵌套json .
我想用嵌套json中的字段作为第一类变量创建Java RDD / DataFrame,这样df.printSchema()显示为,
| - name:string(nullable = true)
| - age:long(nullable = true)
| - address:string(nullable = true)
| - state:string(nullable = true)
| - city:string(nullable = true)
| - pincode:long(nullable = true)
| - :string(nullable = true)
此架构将轻松进行聚合 .
我编写了以下代码,但无法获得上述必需的架构,
DataFrame df = sqlContext.sql("select name, age, details");
JavaRDD modifiedRDD = df.toJavaRDD().map((Function) row -> {
Row modifiedRow = null;
if (row != null) {
String details = row.getString(2);
ObjectMapper mapper = new ObjectMapper();
try {
UserInfo userInfo = mapper.readValue(details, UserInfo.class);
Map details = userInfo.getDetails();
// TODO create modified RDD using default columns and exploded details map such that keys in map are first level columns i.e. userInfo and keys from details map
} catch (IOException e) {
e.printStackTrace();
}
}
return modifiedRow;
});
目标是从原始RDD转换为修改后的RDD,在HDFS中保留,然后进行计算,以便可以在同一组修改的RDD上执行多个聚合 .
请建议 .