我在VirtualBox Ubuntu 14 32位VM中设置了Spark 1.3 . 我已将csv文件带入Spark DataFrame,并尝试了一些操作,这些操作会给我错误消息,我无法排除故障 .
pySpark代码如下
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from datetime import *
from dateutil.parser import parse
sqlContext = SQLContext(sc)
elevFile = sc.textFile('file:////sharefolder/Jones Lake.csv')
header = elevFile.first()
schemaString = header.replace('"','')
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
fields[0].dataType = IntegerType()
fields[1].dataType = TimestampType()
fields[2].dataType = FloatType()
schema = StructType(fields)
elevHeader = elevFile.filter(lambda l: "Hour" in l)
elevHeader.collect()
elevNoHeader = elevFile.subtract(elevHeader)
print elevNoHeader.take(5)
elev_df = (elevNoHeader.map(lambda k: k.split(","))
.map(lambda p: (int(p[0]), parse(p[1]), float(p[2])))
.toDF(schema))
到目前为止,一切正常 . 它打印出前5行的新DataFrame没问题:
print elev_df.head(5)
[Row(Hour=6, Date=datetime.datetime(1989, 9, 19, 0, 0), Value=641.6890258789062), Row(Hour=20, Date=datetime.datetime(1992, 4, 30, 0, 0), Value=633.7100219726562), Row(Hour=10, Date=datetime.datetime(1987, 7, 26, 0, 0), Value=638.6920166015625), Row(Hour=1, Date=datetime.datetime(1991, 2, 26, 0, 0), Value=634.2100219726562), Row(Hour=2, Date=datetime.datetime(1984, 7, 28, 0, 0), Value=639.8779907226562)]
但是,当我尝试做简单的分组和计数时,我遇到的错误我无法排除故障 .
elev_df.groupBy("Hour").count().show()
给出错误(前面几行错误) .
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-209-6533c596fac9> in <module>()
----> 1 elev_df.groupBy("Hour").count().show()
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n)
271 5 Bob
272 """
--> 273 print self._jdf.showString(n).encode('utf8', 'ignore')
274
275 def __repr__(self):
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
有关进一步排除故障的想法吗?
1 回答
好像你的CSV有一些空白值 . 我可以看到你正在替换空白值但是groupby不接受我相信 . 使用spark数据帧处理你的csv空白值是一种简单的方法 -