我是第一次使用AWS,并且已将我的文件存储在AWS上 . 到目前为止,这是我尝试用来读取文件的内容 .

artist_data = sc.textFile('hdfs:///<aws_server>:<port>/home/ubuntu/artist_stuff/_artist_data')

还尝试过:

artist_data = sc.textFile('hdfs:////home/ubuntu/artist_stuff/_artist_data')

然后我就做了我的RDD:

artist_data = artist_data.map(lambda line:line.encode("ascii", "ignore").strip().split()).filter(lambda line: len(line) > 1)

每次运行artist_data.collect()时都会出现此错误 .

当我刚尝试 sc.textFile("file:///home/ubuntu/artist_stuff/_artist_data") 时,我得到一个不同的错误:InvalidInputException:输入路径不存在:file:/ home / ubuntu / Assignment_2 / _artist_data我猜是由于分区或其他东西而导致的错误 . 因此我选择将其编码为 hdfs:///

这是完整的错误 - 日志:

()最近的Py4JJavaError Traceback(最近一次调用)

----> 1 artist_data.collect()

/home/ubuntu/spark-2.0.0-bin-hadoop2.7/python/pyspark/rdd.py in collect(self)

774         """
775         with SCCallSiteSync(self.context) as css:
  • 776 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())777返回列表(_load_from_socket(port,self._jrdd_deserializer))778

/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py in call (self,* args)

1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

get_return_value中的/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/protocol.py(answer,gateway_client,target_id,name)

317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError:调用z:org.apache.spark.api.python.PythonRDD.collectAndServe时发生错误 . :java.io.IOException:org.apache.hadoop上的org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:143)中不完整的HDFS URI,没有主机:hdfs:/ home / ubuntu / artist_stuff / _artist_data . fs.FileSystem.createFileSystem(FileSystem.java:2653)org.apache.hadoop.fs.FileSystem.access $ 200(FileSystem.java:92)at org.apache.hadoop.fs.FileSystem $ Cache.getInternal(FileSystem.java) :2687)org.apache.hadoop.fs.FileSystem $ Cache.get(FileSystem.java:2669)位于org.apache.hadoop的org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371) . 位于org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)的org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)的fs.Path.getFileSystem(Path.java:295) org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)位于org.apache.spark.rdd.RDD $$的org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) anonfun $在org.apache.sp上分区$ 2.apply(RDD.scala:248) ark.rdd.RDD $$ anonfun $ partition $ 2.apply(RDD.scala:246)在scala.Option.getOrElse(Option.scala:121)org.apache.spark.rdd.RDD.partitions(RDD.scala: 246)org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)atg.apache.spark.rdd.RDD $$ anonfun $ partitions $ 2.apply(RDD.scala:248)at org.apache .spark.rdd.RDD $$ anonfun $在org.apache.spark.rdd.RDD.partitions(RDD.scala)的scala.Option.getOrElse(Option.scala:121)上分区$ 2.apply(RDD.scala:246) :246)org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53)at org.apache.spark.rdd.RDD $$ anonfun $ partitions $ 2.apply(RDD.scala:248)at at org.apache.spark.rdd.RDD $$ anonfun $ partition $ 2.apply(RDD.scala:246)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.rdd.RDD.partitions( RDD.scala:246)org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)atg.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:893)at org .apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala: 151)org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)atg.apache.spark.rdd.RDD.withScope(RDD.scala:358)atg.apache.spark.rdd . RDD.collect(RDD.scala:892)位于org.apache.apark.api.pyi.pyy.pyy上的org.apache.apark.api.python.PythonRDD $ .collectAndServe(PythonRDD.scala:453)中 . (PythonRDD.scala:453),来自org.apache.apark.api.python.PythonRDD.collectAndServe(PythonRDD.scala )sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect . 位于py4j.reflection的一个py4j.reflection.MethodInvoke.invoke(MethodInvoker.java:237)的py4j.Gateway.invoke(Gateway.java)上的Method.invoke(Method.java:606)py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) :280)py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:211)at java . lang.Thread.run(Thread.java:745)