首页 文章

火花2.1.1读取/写入EMR上的红移错误

提问于
浏览
0

尝试从/向redshift读取/写入(s3中的数据) . 但访问数据框时出现奇怪的错误 . 我可以看到数据框正在创建,它能够访问数据,因为它输出表的列名

scala> :require /home/hadoop/spark-redshift_2.10-2.0.1.jar
Added '/home/hadoop/spark-redshift_2.10-2.0.1.jar' to classpath.

scala> :require /home/hadoop/RedshiftJDBC41-1.2.12.1017.jar
Added '/home/hadoop/RedshiftJDBC41-1.2.12.1017.jar' to classpath.

scala> :require /home/hadoop/spark-avro_2.11-3.2.0.jar
Added '/home/hadoop/spark-avro_2.11-3.2.0.jar' to classpath.

scala>   val read_data = (spark.read
     |     .format("com.databricks.spark.redshift")
     |     .option("url", "jdbc:redshift://redshifthost/schema?user=admin&password=password")
     |     .option("query", "SELECT * FROM schema.table LIMIT 1")
     |     .option("tempdir", tempS3Dir)
     |     .option("forward_spark_s3_credentials",true)
     |     .load())
read_data: org.apache.spark.sql.DataFrame = [aid: int, uid: int ... 3 more fields]

scala> read_data.count()

java.lang.ClassCastException:无法将scala.collection.immutable.List $ SerializationProxy的实例分配给字段org.apache.spark.rdd.RDD.org $ apache $ spark $ rdd $ RDD $$ dependencies_类型为scala.collection . java.io中的java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)中的org.apache.spark.rdd.MapPartitionsRDD实例中的seq,java.io中的java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) . java.io.ObjectInputStream.readSod中的ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)java.io.ObjectInputStream.readSerial上的java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) .java:1535)java.io.ObjectInputStream.readOdialialObject(ObjectInputStream.java:2027)java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)在java.io.ObjectInputStream.readObject0(Objec tInputStream.java:1535)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)at scala.collection.immutable.List $ SerializationProxy.readObject(List.scala:479)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native方法)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at at java.io.Object.StartInreamStream中的java.io.ObjectInputStream.readSerial上的java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)中的java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)java.io.ObjectInputStream的java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) .readObject0(ObjectInputStream.java:1535)位于java.io.ObjectInputStream.readOdialialObject(ObjectInputStream,java.io.ObjectInputStream.readSerial)中的java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)中的.readObject0(ObjectInputStream.java:2245) . java:2027)at java.io.ObjectInpu java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream)java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)中的tStream.readObject0(ObjectInputStream.java:1535) .java:2027)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)at scala.collection.immutable.List $ SerializationProxy.readObject(List . scala:479)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang .reflect.Method.invoke(Method.java:498)位于java.io.Object.putInputStream中的java.io.Object.Startsream.DavidReadObject(ObjectStreamClass.java:1058)java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) . readOrdinaryObject(ObjectInputStream.java:20 27)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)java. java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)java. java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) . io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027),java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535),java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2169)java.io.ObjectInputStream.readObject中的java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)中的java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) . (ObjectInputStream.java: 422)org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)atg.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)at org.apache.spark.scheduler.ShuffleMapTask .runTask(ShuffleMapTask.scala:85)at Org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)at org.apache.spark.scheduler.Task.run(Task.scala:99)at atorg.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:322)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor $ Worker.run( ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)驱动程序stacktrace:at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler . scala:1505)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1493)at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler . scala:1492)scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)at org.apache.spark.scheduler.DAGScheduler .abortStage(DAGScheduler.scala:1492)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:803)at org.apache.spark.scheduler.DAGSchedul er $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:803)scla.Option.foreach(Option.scala:257)atg.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)at org位于org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675)的.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1720)org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler) .scala:1664)org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48)org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629)at org . apache.spark.SparkContext.runJob(SparkContext.scala:1925)org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)atg.apache.spark.SparkContext.runJob(SparkContext.scala:1951)at org .apache.spark.SparkContext.runJob(SparkContext.scala:1965)atg.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:936)at org.apache.spark.rdd.RDDOperationScope $ .withScope(R DDOperationScope.scala:151)位于org.apache的org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)atg.apache.spark.rdd.RDD.withScope(RDD.scala:362) . spark.rdd.RDD.collect(RDD.scala:935)org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)at org.apache.spark.sql.Dataset $$ anonfun $ org $ apache $ spark $ sql $ Dataset $$在org.apache.spark的org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:57)执行$ 1 $ 1.apply(Dataset.scala:2386) org.apache.spark.sql.Dataset.org上的.sql.Dataset.withNewExecutionId(Dataset.scala:2788)$ apache $ spark $ sql $ Dataset $$在org.apache.spark执行$ 1(Dataset.scala:2385) .sql.Dataset.org $ apache $ spark $ sql $ dataset $$ collect(Dataset.scala:2392)at org.apache.spark.sql.Dataset $$ anonfun $ count $ 1.apply(Dataset.scala:2420)at at org.apache.spark.sql.Dataset $$ anonfun $ count $ 1.apply(Dataset.scala:2419)位于org.apache.spark的org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2801) . sql.Dataset.count(d ataset.scala:2419)

1 回答

  • 0

    问题在于我是如何导入包或我正在使用的包的版本 . 下面安装包的方式就像一个魅力

    ./bin/spark-shell --packages com.databricks:spark-avro_2.11:3.2.0,com.databricks:spark-redshift_2.11:2.0.1,com.databricks:spark-csv_2.11:1.5.0 --jars /home/hadoop/RedshiftJDBC41-1.2.12.1017.jar
    

相关问题