我有一个pyspark数据框如下:
Stock | open_price | list_price
A | 100 | 1
B | 200 | 2
C | 300 | 3
我试图通过map和rdd实现以下内容,它打印出每个和indivial行的股票,open_price * list_price,整个open_price列的总和
(A, 100 , 600)
(B, 400, 600)
(C, 900, 600)
因此,使用上面的表格,例如第一行:A,100 * 1,100 200 300
我可以使用下面的代码获得前2列 .
stockNames = sqlDF.rdd.map(lambda p: (p.stock,p.open_price*p.open_price) ).collect()
for name in stockNames:
print(name)
但是,当我尝试总和(p.open_price)如下:
stockNames = sqlDF.rdd.map(lambda p: (p.stock,p.open_price*p.open_price,sum(p.open_price)) ).collect()
for name in stockNames:
print(name)
它给了我下面的错误
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 75.0 failed 1 times, most recent failure: Lost task 0.0 in stage 75.0 (TID 518, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\Spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
File "C:\Spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
File "C:\Spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-48-f08584cc31c6>", line 19, in <lambda>
TypeError: 'int' object is not iterable
如何在我的 Map RDD中添加open_price的总和?
提前谢谢你,因为我还是RDD和 Map 的新手 .
1 回答
单独计算金额:
并添加为列:
或
crossJoin
:RDD.map
在这里不适用(你可以用它代替withColumn
,但它效率低,我不建议这样做) .