首页 文章

使用PyCharm的Windows上的PySpark:java.net.SocketException

提问于
浏览
0

从昨天开始,我的pySpark代码面临一种奇怪的行为 . 我正在使用PyCharm和Spark 1.5在Windows上工作 .

我在ipython笔记本上成功运行了以下代码(使用相同版本的python但在集群上) . 然而,当我在使用Pycharm的Windows环境中启动它时,我得到了这个:

from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark import SparkConf, SparkContext

from pyspark.sql import SQLContext

# SQL / Spark context:
conf = (SparkConf().setMaster("local").setAppName("analysis"))#.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# Input CSV files :
inputCsvFile = "survey.csv"
separator = ','

# read the input file into a RDD
rdd = sc.textFile(inputCsvFile).split(separator)
header = rdd.first().split(separator)

# build the Schema: (some basic functions to chreate StructType object with string as default type)
schema = dictSchemaFromColumnsList(header)
schemaDf = dictSchemaToDFSchema(schema)

# create Dataframe:
df = sqlContext.createDataFrame(rdd, schemaDf)
pprint(rdd.first())
print('\ndf.count()=' + str(df.count()))

# display
df.show()

16/06/23 11:46:32错误执行程序:阶段1.0中的任务0.0中的异常(TID 1)java.net.SocketException:通过对等方重置连接:java.net.SocketOutputStream.socketWrite0中的套接字写入错误(本机方法) )java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)at java.net.SocketOutputStream.write(SocketOutputStream.java:153)at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)at java.io位于org.apache.spark.api的java.io.FilterOutputStream.write(FilterOutputStream.java:97)的java.io.DataOutputStream.write(DataOutputStream.java:107)中的.BufferedOutputStream.write(BufferedOutputStream.java:126) . python.PythonRDD $ .writeUTF(PythonRDD.scala:590)at org.apache.spark.api.python.PythonRDD $ .org $ apache $ spark $ api $ python $ PythonRDD $$写$ 1(PythonRDD.scala:410)at at在org.apache.spark.api.python.PythonRDD $$ anonfun $ writeIteratorToStream $ 1.apply(PythonRDD.scala :420)a t scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at org.apache.spark.api.python.PythonRDD $ .writeIteratorToStream(PythonRDD . scala:420)atg.apache.spark.api.python.PythonRDD $ WriterThread $$ anonfun $ run $ 3.apply(PythonRDD.scala:249)at org.apache.spark.util.Utils $ .logUncaughtExceptions(Utils.scala :1699)org.apache.spark.api.python.PythonRDD $ WriterThread.run(PythonRDD.scala:208)16/06/23 11:46:32 WARN TaskSetManager:阶段1.0中的丢失任务0.0(TID 1,localhost ):java.net.SocketException:由peer重置连接:java.net.SocketOutputStream.socketWrite0(本机方法)在java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)处于java.net.SocketOutputStream处的套接字写入错误 . 在java.io.Data.utputStream.write(BufferedOutputStream.java:126)的java.io.Data.ut.Stush.write(BufferedOutputStream.java:126)java.io.DataOutputStream.write(DataOutpu)上编写(SocketOutputStream.java:153)java.io.BuredOutputStream.flushBuffer(BufferedOutputStream.java:82) tStream.java:107)位于org.apache.spark的org.apache.spark.api.python.PythonRDD $ .writeUTF(PythonRDD.scala:590)的java.io.FilterOutputStream.write(FilterOutputStream.java:97) . api.python.PythonRDD $ .org $ apache $ spark $ api $ python $ PythonRDD $$写入$ 1(PythonRDD.scala:410)org.apache.spark.api.python.PythonRDD $$ anonfun $ writeIteratorToStream $ 1.apply( PythonRDD.scala:420)at org.apache.spark.api.python.PythonRDD $$ anonfun $ writeIteratorToStream $ 1.apply(PythonRDD.scala:420)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)在scala.collection.AbstractIterator.foreach(Iterator.scala:1157)atg.apache.apark.api.py.Python RDD $ .writeIteratorToStream(PythonRDD.scala:420)at org.apache.spark.api.python.PythonRDD $ WriterThread $$ anonfun $ run $ 3.apply(PythonRDD.scala:249)org.apache.spark.util.Utils $ .logUncaughtExceptions(Utils.scala:1699)at org.apache.spark.api.python.PythonRDD $ WriterThread .run(PythonRDD.scala:208)16/06/23 11:46:32错误TaskSetManager:阶段1.0中的任务0失败1次S;中止工作16/06/23 11:46:32 INFO TaskSchedulerImpl:删除任务已完成的TaskSet 1.0,来自池16/06/23 11:46:32 INFO TaskSchedulerImpl:取消阶段1 16/06/23 11: 46:32 INFO DAGScheduler:ResultStage 1(PythonRDD.scala:361中的runJob)在0.792 s中失败16/06/23 11:46:32 INFO DAGScheduler:作业1失败:runRD在PythonRDD.scala:361,取得0.802922 s Traceback (最近一次调用最后一次):文件“C:/Users/home/PycharmProjects/pySpark_analysis/Survey_2011-2016_Analysis.py”,第38行,在df = sqlContext.createDataFrame(rdd,schemaDf)文件“C:\ Spark \ spark- 1.5.0-bin-hadoop2.6 \ python \ pyspark \ sql \ context.py“,第404行,在createDataFrame rdd中,schema = self._createFromRDD(data,schema,samplingRatio)文件”C:\ Spark \ spark-1.5 .0-bin-hadoop2.6 \ python \ pyspark \ sql \ context.py“,第296行,在_createFromRDD rows = rdd.take(10)文件”C:\ Spark \ spark-1.5.0-bin-hadoop2 . 6 \ python \ pyspark \ rdd.py“,第1299行,取res = self.context.runJob(self,takeUpToNumLeft,p)文件”C:\ Spark \ spark-1 .5.0-bin-hadoop2.6 \ python \ pyspark \ context.py“,第916行,在runJob端口=self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd,partitions)文件“C:\ Spark \ spark-1.5.0-bin-hadoop2.6 \ python \ lib \ py4j-0.8.2.1 -src.zip \ py4j \ java_gateway.py“,第538行,在调用文件”C:\ Spark \ spark-1.5.0-bin-hadoop2.6 \ python \ pyspark \ sql \ utils.py“中,第36行,在deco中返回f(* a,** kw)文件“C:\ Spark \ spark-1.5.0-bin-hadoop2.6 \ python \ lib \ py4j-0.8.2.1-src.zip \ py4j \ protocol.py “,第300行,在get_return_value py4j.protocol.Py4JJavaError:调用z:org.apache.spark.api.python.PythonRDD.runJob时发生错误 . :org.apache.spark.SparkException:作业因阶段失败而中止:阶段1.0中的任务0失败1次,最近失败:阶段1.0中失去的任务0.0(TID 1,localhost):java.net.SocketException:Connection reset by peer:在java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)的java.net.SocketOutputStream.socketWrite0(本地方法)中的套接字写入错误,java.net.SocketOutputStream.write(SocketOutputStream.java:153)位于java.io.FilterOutputStream的java.io.DataOutputStream.write(DataOutputStream.java:107)的java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)中的.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) . 在Org.apache.apark.api.python.PythonRDD $ .writeUTF(PythonRDD.scala:590)的org.apache.spark.api.python.PythonRDD $ .org $ apache $ spark $中写入(FilterOutputStream.java:97) api $ python $ PythonRDD $$在org.a上的org.apache.spark.api.python.PythonRDD $$ anonfun $ writeIteratorToStream $ 1.apply(PythonRDD.scala:420)写$ 1(PythonRDD.scala:410) pache.spark.api.python.PythonRDD $$ anonfun $ writeIteratorToStream $ 1.apply(PythonRDD.scala:420)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach( Iterator.scala:1157)org.apache.apark.api.python.PythonRDD $ .writeIteratorToStream(PythonRDD.scala:420)at org.apache.spark.api.python.PythonRDD $ WriterThread $$ anonfun $ run $ 3.apply (PythonRDD.scala:249)org.apache.spark.util.Utils $ .logUncaughtExceptions(Utils.scala:1699)at org.apache.spark.api.python.PythonRDD $ WriterThread.run(PythonRDD.scala:208)驱动程序堆栈跟踪:at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1280)at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1 . 申请(DAGScheduler.scala:1268)at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1267)at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala: 59)在scala.collect org.apache.spark中的org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)中的ion.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47),$ ananfun $ handleTaskSetFailed $ 1 . 在org.apache上的scala.Option.foreach(Option.scala:236)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:697)上申请(DAGScheduler.scala:697) .spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala) :1455)在org.apache的org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48) . spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)at org.apache.spark.SparkContext.runJob( SparkContext.scala:1826)位于org.apache的org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)org.apache.spark.api.python.PythonRDD $ .runJob(PythonRDD.scala:361) . spark.api.python.PythonRDD.runJob(PythonRDD.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43)在py4j.reflection.RevlectionEngine.invoke(ReflectionEngine.invoke)的py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)上的java.lang.reflect.Method.invoke(Method.java:498) :379)在py4j.Gateway.invoke(Gateway.java:259)的py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)py4j.mands.CallCommand.exe执行(CallCommand.java:79)py4j.GatewayConnection .run(GatewayConnection.java:207)at java.lang.Thread.run(Thread.java:745)引起:java.net.SocketException:连接重置为peer:java.net上的java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)中的java.net.SocketOutputStream.socketWrite0(本地方法)中的套接字写入错误,java.net.SocketOutputStream.write(SocketOutputStream.java:153) . io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)位于java.io.FilterOutputStream.write的java.io.DataOutputStream.write(DataOutputStream.java:107)的java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) (FilterOutputStream.java:97)org.apache.apark.api.python.PythonRDD $ .writeUTF(PythonRDD.scala:590)at org.apache.spark.api.python.PythonRDD $ .org $ apache $ spark $ api $ python $ PythonRDD $$在org.apache.apache.spark.api上的org.apache.spark.api.python.PythonRDD $$ anonfun $ writeIteratorToStream $ 1.apply(PythonRDD.scala:420)写$ 1(PythonRDD.scala:410) scala.collection.Aterator上的.python.PythonRDD $$ anonfun $ writeIteratorToStream $ 1.apply(PythonRDD.scala:420)scala.collection.Iterator $ class.foreach(Iterator.scala:727)(Iterator.scala:115) 7)org.apache.apark.api.python.PythonRDD $ .writeIteratorToStream(PythonRDD.scala:420)at org.apache.spark.api.python.PythonRDD $ WriterThread $$ anonfun $ run $ 3.apply(PythonRDD.scala :249)org.apache.spark.util.Utils $ .logUncaughtExceptions(Utils.scala:1699)at org.apache.spark.api.python.PythonRDD $ WriterThread.run(PythonRDD.scala:208)16/06 / 23 11:46:32 INFO SparkContext:从关闭钩子调用stop()

奇怪的是,如果我在调试模式下运行代码并添加一个基本指令,如:

People=["1,Maj,123","2,Pvt,333","3,Col,999"]
rrd1=sc.parallelize(People)
rrd1.first()

我得到的代码工作....这使得运行不一致....任何建议将不胜感激...

更新:回顾问题之后,它看起来与Matei后面描述的行为完全相同 . 显然,在缩短输入csv文件时问题就解决了 .

1 回答

  • 1

    使用(比方说)大文件(20000行)并尝试使用正则表达式过滤它们时遇到了同样的问题:

    import re
    pattern = re.compile("...")
    rdd.filter(lambda x: pattern.search(x) is not None)
    

    你描述的行为也是间歇性的 . 将文件截断为~2000行后,它可以正常工作 .

相关问题