我使用Spark 2.2.1和Parquet 1.8.1 .
我想从Kafka读取JSON数据并进行一些转换然后将数据写入镶木地板文件,然后由Apache Hive加载 . 但是当writeStream到镶木地板时我遇到了以下错误 .
Caused by: org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema
at org.apache.parquet.schema.GroupType.<init>(GroupType.java:92)
at org.apache.parquet.schema.GroupType.<init>(GroupType.java:48)
at org.apache.parquet.schema.MessageType.<init>(MessageType.java:50)
at org.apache.parquet.schema.Types$MessageTypeBuilder.named(Types.java:1256)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.<init>(ParquetSchemaConverter.scala:563)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.<clinit>(ParquetSchemaConverter.scala)
... 22 more
我用谷歌搜索,发现同样的issue遇到了其他的,其根本原因并非所有字段都是叶子,而镶木地板不支持,但在我的数据框中有所有叶子字段 . 为什么?提前致谢!
这是我的代码:
val nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
val jsonOptions: Map[String, String] = Map{ "timestampFormat" -> nestTimestampFormat }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "wikipedia-edits")
.option("startingOffsets", "earliest")
.option("group.id", "SparkProcessor")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions) as "wikiEdit")
val parsed = df.select("wikiEdit.bot", "wikiEdit.title", "wikiEdit.user", "wikiEdit.wiki")
parsed.printSchema()
//parsed.writeStream.format("console").option("truncate", false).start().awaitTermination(30000)
parsed.writeStream.format("parquet")
.option("path","hdfs://localhost:9000/wiki-parquet-spark")
.option("checkpointLocation", "hdfs://localhost:9000/checkpoint")
.trigger(Trigger.ProcessingTime(10*1000))
.start.awaitTermination()
该程序可以打印模式并在数据框中显示一些数据 .
root
|-- bot: boolean (nullable = true)
|-- title: string (nullable = true)
|-- user: string (nullable = true)
|-- wiki: string (nullable = true)
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----------------------------+----------+-----------+
|bot |title |user |wiki |
+-----+-----------------------------+----------+-----------+
|false|Jak Roberto |WikiPedant|enwiki |
|false|File:Oostkamp01.jpg |Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|true |Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|true |Category:Self-published work |Herpoel |commonswiki|
|true |Category:Geography of Belgium|Herpoel |commonswiki|
+-----+-----------------------------+----------+-----------+
only showing top 20 rows
1 回答
TL;DR 升级到Spark 2.2.0(甚至更好到2.2.1) .
我认为它与PARQUET-363 Cannot construct empty MessageType for ReadContext.requestedSchema有关,确实提到了错误消息和Spark .
然后在问题报告中:
该问题报告与pull request #263相关联:
并且pull请求会删除检查(您在流式数据集/ Spark结构化流中已经面临的检查) .
有了这个,我们了解到Spark中的Parquet版本可能与1.8版本分支不同 .
这导致pull request中的讨论最终因为另一个pull request而被关闭,这被认为是碰撞镶木地板版本的一部分 1.8.2 . 这就是我们希望摆脱错误信息的Parquet版本 .
由于Spark使用Parquet 1.8.2作为Spark 2.2.0,我的建议是升级到Spark 2.2.0(甚至更好到2.2.1) .