首页 文章

pySpark - DataFrame groupBy故障排除回溯

提问于
浏览
0

我在VirtualBox Ubuntu 14 32位VM中设置了Spark 1.3 . 我已将csv文件带入Spark DataFrame,并尝试了一些操作,这些操作会给我错误消息,我无法排除故障 .

pySpark代码如下

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from datetime import *
from dateutil.parser import parse
sqlContext = SQLContext(sc)
elevFile = sc.textFile('file:////sharefolder/Jones Lake.csv')
header = elevFile.first()
schemaString = header.replace('"','')
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
fields[0].dataType = IntegerType()
fields[1].dataType = TimestampType()
fields[2].dataType = FloatType()
schema = StructType(fields)
elevHeader = elevFile.filter(lambda l: "Hour" in l)
elevHeader.collect()
elevNoHeader = elevFile.subtract(elevHeader)
print elevNoHeader.take(5)
elev_df = (elevNoHeader.map(lambda k: k.split(","))
            .map(lambda p: (int(p[0]), parse(p[1]), float(p[2])))
            .toDF(schema))

到目前为止,一切正常 . 它打印出前5行的新DataFrame没问题:

print elev_df.head(5)
[Row(Hour=6, Date=datetime.datetime(1989, 9, 19, 0, 0), Value=641.6890258789062), Row(Hour=20, Date=datetime.datetime(1992, 4, 30, 0, 0), Value=633.7100219726562), Row(Hour=10, Date=datetime.datetime(1987, 7, 26, 0, 0), Value=638.6920166015625), Row(Hour=1, Date=datetime.datetime(1991, 2, 26, 0, 0), Value=634.2100219726562), Row(Hour=2, Date=datetime.datetime(1984, 7, 28, 0, 0), Value=639.8779907226562)]

但是,当我尝试做简单的分组和计数时,我遇到的错误我无法排除故障 .

elev_df.groupBy("Hour").count().show()

给出错误(前面几行错误) .

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-209-6533c596fac9> in <module>()
----> 1 elev_df.groupBy("Hour").count().show()

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n)
    271         5   Bob
    272         """
--> 273         print self._jdf.showString(n).encode('utf8', 'ignore')
    274 
    275     def __repr__(self):

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

有关进一步排除故障的想法吗?

1 回答

  • 1

    好像你的CSV有一些空白值 . 我可以看到你正在替换空白值但是groupby不接受我相信 . 使用spark数据帧处理你的csv空白值是一种简单的方法 -

    fillna(value, subset=None)
    Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.
    
    Parameters: 
    value – int, long, float, string, or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, or string.
    subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
    

相关问题