我从MongoDB使用dataframe连接器读取(Py)Spark Dataframe时收到“org.bson.BsonInvalidOperationException:Invalid state INITIAL”错误:

df = spark.read.format("com.mongodb.spark.sql").option("uri", config.uri).option("database", config.DB).option("collection", config.collection).load()
df.count()

眼镜:

Spark version: 2.2.1 (Scala 11) 
bson-3.6.0.jar
mongo-spark-connector_2.11-2.2.1.jar
mongo-java-driver-3.4.2.jar

在df上调用count动作(或其他动作)时,我收到以下错误:

Py4JJavaError:调用o60.count时发生错误 . :org.apache.spark.SparkException:作业因阶段失败而中止:阶段1.0中的任务0失败1次,最近失败:阶段1.0中丢失的任务0.0(TID 1,localhost, Actuator 驱动程序):org.bson.BsonInvalidOperationException :org.bson.json.Sjames.SreamJreamWriter.checkPreconditions(StrictCharacterStreamJsonWriter.java:352)org.bson.json.JsonNullConverter.convert上的org.bson.json.StrictCharacterStreamJsonWriter.writeNull(StrictCharacterStreamJsonWriter.java:183)中的状态INITIAL无效:JsonNullConverter .java:25)atg.bson.json.JsonNullConverter.convert(JsonNullConverter.java:22)org.bson.json.JsonWriter.doWriteNull(JsonWriter.java:204)at org.bson.AbstractBsonWriter.writeNull(AbstractBsonWriter . java:556)org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:38)org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:28)at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext) .java:91)at org.bson.codecs.BsonValueCodec.encode(BsonValueCodec.jav) a:62)com.mongodb.spark.sql.BsonValueToJson $ .apply(BsonValueToJson.scala:29)at com.mongodb.spark.sql.MapFunctions $ .bsonValueToString(MapFunctions.scala:103)at com.mongodb.spark .sql.MapFunctions $ .com $ mongodb $ spark $ sql $ MapFunctions $$ convertToDataType(MapFunctions.scala:78)at com.mongodb.spark.sql.MapFunctions $$ anonfun $ 3.apply(MapFunctions.scala:39)at com .mongodb.spark.sql.MapFunctions $$ anonfun $ 3.apply(MapFunctions.scala:37)at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at at scala.collection.TraversableLike $ class.map(TraversableLike.scala:234)at scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)at com.mongodb.spark.sql.MapFunctions $ .documentToRow(MapFunctions) .scala:37)在com.mongodb.spark.sql.Mon goRelation $$ anonfun $ buildScan $ 2.apply(MongoRelation.scala:45)at com.mongodb.spark.sql.MongoRelation $$ anonfun $ buildScan $ 2.apply(MongoRelation.scala:45)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(Unknown Source )org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:395) )org.apache.spark.sql.execution.columnar.InMemoryRelation $$ anonfun $ 1 $$ anon $ 1.hasNext(InMemoryRelation.scala:133)at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala :371)org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:1055)org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala :1029)org.apac he.spark.storage.BlockManager.doPut(BlockManager.scala:969)org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)atg.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager . scala:760)org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)org.apache.spark.rdd.RDD.iterator(RDD.scala:285)atg.apache.spark.rdd .MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)位于org.apache.spark.rdd.RDD.iterator(RDD.scala:287)的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)at at Org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)位于org.apache.spark.rdd.RDD.iterator的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) RDD.scala:287)org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)位于org.apache.spark的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) .rdd.RDD.iterator(RDD.scala:287)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)at atorg.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)org.apache.spark.scheduler.Task.run(Task.scala:108)org.apache.spark.executor.Executor $ TaskRunner . 运行(Executor.scala:338)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread .run(Thread.java:748)驱动程序堆栈跟踪:at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1517)at org.apache.spark.scheduler .DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1505)at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1504)at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)位于org.apache.mocu.ArrayBuffer.foreach(ArrayBuffer.scala:48)atg.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)at org . 亚太地区he.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:814)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:814)at scala . 在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)的org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)org.apach上的Option.foreach(Option.scala:257) .apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)at org.apache.spark.util.EventLoop $$ anon $ 1 .run(EventLoop.scala:48)位于org.apache的org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) . spark.SparkContext.runJob(SparkContext.scala:2050)org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)at org.apache.spark.SparkContext.runJob(SparkContext.s) cala:2094)org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:936)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151)at at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)atg.apache.spark.rdd.RDD.withScope(RDD.scala:362)atg.apache.spark.rdd.RDD.collect (RDD.scala:935)org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)at org.apache.spark.sql.Dataset $$ anonfun $ count $ 1.apply(Dataset.scala) :2435)org.apache.spark.sql.Dataset $$ anonfun $ count $ 1.apply(Dataset.scala:2434)at org.apache.spark.sql.Dataset $$ anonfun $ 55.apply(Dataset.scala:2842) )org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:65)atg.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)org.apache.spark.sql .Dataset.count(Dataset.scala:2434)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 )在py4j的py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)的sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498) .reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)py4j.Gateway.invoke(Gateway.java:280)py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute( CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:748)引起:org.bson.BsonInvalidOperationException:org.bson上的状态INITIAL无效.gson.StrictCharacterStreamJsonWriter.checkPreconditions(StrictCharacterStreamJsonWriter.java:352)org.bson.json.StrictCharacterStreamJsonWriter.writeNull(StrictCharacterStreamJsonWriter.java:183)org.bson.json.JsonNullConverter.convert(JsonNullConverter.java:25)at org . org.bson.json.JsonWriter.d中的bson.json.JsonNullConverter.convert(JsonNullConverter.java:22) oWriteNull(JsonWriter.java:204)位于org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:38)的org.bson.AbstractBsonWriter.writeNull(AbstractBsonWriter.java:556)org.bson.codecs.BsonNullCodec.encode (BsonNullCodec.java:28)org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)位于com.mongodb.spark.sql的org.bson.codecs.BsonValueCodec.encode(BsonValueCodec.java:62) . BsonValueToJson $ .apply(BsonValueToJson.scala:29)at atcom.mongodb.spark.sql.MapFunctions $ .bsonValueToString(MapFunctions.scala:103)at com.mongodb.spark.sql.MapFunctions $ .com $ mongodb $ spark $ sql $ MapFunctions $$ convertToDataType(MapFunctions.scala:78) at com.mongodb.spark.sql.MapFunctions $$ anonfun $ 3.apply(MapFunctions.scala:39)at com.mongodb.spark.sql.MapFunctions $$ anonfun $ 3.apply(MapFunctions.scala:37)at scala.collection .TraversableLike $$ anonfun $表$ 1.适用(TraversableLike.scala:234)在scala.collection.TraversableLike $$ anonfun $表$ 1.适用(TraversableLike.scala:234)在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized .scala:33)scala.collection.mutable.mutable.mr.OrrayOps $ scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:234)at scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)在com.mongodb.spark.sql.MapFunctions $ .documentToRow(MapFunctions.scala:37)在com.mongodb.spark.sql.MongoRelation $$ anonfun $ buildScan $ 2.适用(MongoRelation .scala:45)在com.mongodb.spark . sql.MongoRelation $$ anonfun $ buildScan $ 2.apply(MongoRelation.scala:45)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)at scala.collection.Iterator $$ anon $ 11.next( Iterator.scala:409)在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(来源不明)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ $$ 8不久$ 1.hasNext(WholeStageCodegenExec.scala:395)在org.apache.spark.sql.execution.columnar.InMemoryRelation $$ anonfun $ $$ 1不久$ 1 .hasNext(InMemoryRelation.scala:133)在org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:371)在org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.适用(图块管理员.scala:1055)org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:1029)org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)at at org.apache.spark.sto rage.BlockManager.doPutIterator(BlockManager.scala:1029)在org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)在org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) org.apache.spark.rdd.RDD.iterator(RDD.scala:285)org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)atg.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala:323)org.apache.spache.rdd.RDD.iterator(RDD.scala:287)位于org.apache的org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) . 位于org.apache.spark.rdd.MapPartitionsRDD.compute的org.apache.spark.rdd.RDD.iterator(RDD.scala:287)中的spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)(MapPartitionsRDD.scala: 38)位于org.apache.spark.ched处的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)org.apache.spark.rdd.RDD.iterator(RDD.scala:287)org.apache.spark.scheduler.ShuffleMapTask .runTask(ShuffleMapTask.scala:96)在org.apache的org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) .spark.scheduler.Task.run(Task.scala:108)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:338)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java) :1142)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)... 1更多