首页 文章

将包含无效字符的嵌套字段从Spark 2导出到Parquet [复制]

提问于
浏览
1

这个问题在这里已有答案:

我正在尝试使用spark 2.0.2将JSON文件转换为镶木地板 .

  • JSON文件来自外部源,因此架构在到达之前无法更改 .

  • 该文件包含属性映射 . 在收到文件之前,属性名称是未知的 .

  • 属性名称包含不能在镶木地板中使用的字符 .

{
    "id" : 1,
    "name" : "test",
    "attributes" : {
        "name=attribute" : 10,
        "name=attribute with space" : 100,
        "name=something else" : 10
    }
}

space和equals字符都不能用于镶木地板,我收到以下错误:

org.apache.spark.sql.AnalysisException: Attribute name "name=attribute" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
java.lang.StackOverflowError 

at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65) 
at org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:258) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1563) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1579) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1578) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1576) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1576) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1579) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1578) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1576) 
at scala.collection.immutable.List.foreach(List.scala:381) 
...
repeat
...

我想做以下其中一项:

  • 当我将数据加载到spark中时,从字段名称中删除无效字符

  • 更改架构中的列名,而不会导致堆栈溢出

  • 以某种方式更改架构以加载原始数据,但在内部使用以下内容:

{
    "id" : 1,
    "name" : "test",
    "attributes" : [
        {"key":"name=attribute", "value" : 10},
        {"key":"name=attribute with space", "value"  : 100},
        {"key":"name=something else", "value" : 10}
    ]
}

3 回答

  • 0

    到目前为止,我发现的唯一解决方案是使用修改的模式重新加载数据 . 新架构将属性加载到 Map 中 .

    Dataset<Row> newData = sql.read().json(path);
    StructType newSchema = (StructType) toMapType(newData.schema(), null, "attributes");
    newData = sql.read().schema(newSchema).json(path);
    

    private DataType toMapType(DataType dataType, String fullColName, String col) {
        if (dataType instanceof StructType) {
            StructType structType = (StructType) dataType;
    
            List<StructField> renamed = Arrays.stream(structType.fields()).map(
                f -> toMapType(f, fullColName == null ? f.name() : fullColName + "." + f.name(), col)).collect(Collectors.toList());
            return new StructType(renamed.toArray(new StructField[renamed.size()]));
        }
        return dataType;
    }
    
    private StructField toMapType(StructField structField, String fullColName, String col) {
        if (fullColName.equals(col)) {
            return new StructField(col, new MapType(DataTypes.StringType, DataTypes.LongType, true), true, Metadata.empty());
        } else if (col.startsWith(fullColName)) {
            return new StructField(structField.name(), toMapType(structField.dataType(), fullColName, col), structField.nullable(), structField.metadata());
        }
        return structField;
    
    }
    
  • 0

    我和 @: 有同样的问题 .

    在我们的例子中,我们解决了对DataFrame的讨人喜欢 .

    val ALIAS_RE: Regex = "[_.:@]+".r
      val FIRST_AT_RE: Regex = "^_".r
    
      def getFieldAlias(field_name: String): String = {
        FIRST_AT_RE.replaceAllIn(ALIAS_RE.replaceAllIn(field_name, "_"), "")
      }
    
      def selectFields(df: DataFrame, fields: List[String]): DataFrame = {
        var fields_to_select = List[Column]()
        for (field <- fields) {
          val alias = getFieldAlias(field)
          fields_to_select +:= col(field).alias(alias)
        }
    
        df.select(fields_to_select: _*)
      }
    

    所以以下json:

    { 
      object: 'blabla',
      schema: {
        @type: 'blabla',
        name@id: 'blabla'
      }
    }
    

    那将会改变 [object, schema.@type, schema.name@id] . @dots (在您的情况下 = )将为SparkSQL创建问题 .

    所以在我们的SelectFields之后你可以用 [object, schema_type, schema_name_id] 结束 . 受宠若惊的DataFrame .

  • 0

    我这样解决了问题:

    df.toDF(df
        .schema
        .fieldNames
        .map(name => "[ ,;{}()\\n\\t=]+".r.replaceAllIn(name, "_")): _*)
    

    我用“_”替换了所有不正确的符号 .

相关问题