我有点火花,我正在使用PySpark来运行我使用Facebook登录连接到我的应用程序的用户列表 .
群集已安装Spark 2.3.1 . 5个实例高手,每一个我都安装了“facebook-sdk”库 . 当我运行单个命令时,我从facebook GraphAPI获得响应,我可以将它用于我需要的任何内容 . 问题是当我在函数内部使用GraphAPI命令时,尝试使用mapPartitions在其上运行 .
这是我的代码:
import facebook
import pyspark
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
def fb_func(rows):
fields='id,first_name,middle_name,last_name,gender,birthday'
ret = []
for row in rows:
token = row[6]
id = row[7]
graph = facebook.GraphAPI(access_token=token, version=3.0)
user = graph.get_object(id=id, fields=fields)
ret.append(str(id) + "," + str(token) + "," + str(user['first_name']) + "," + str(user['middle_name']) + "," + str(user['last_name']) + "," + str(user['gender']) + "," + str(user['birthday']))
return ret
file = spark.read.parquet("s3://parquet-files/fb_data/*")
fb_file = file.repartition(5)
fb_schema = StructType([
StructField('user_id', LongType(), True),
StructField('token', StringType(), True),
StructField('first_name', StringType(), True),
StructField('middle_name', StringType(), True),
StructField('last_name', StringType(), True),
StructField('gender', StringType(), True),
StructField('birthday', StringType(), True)
])
fb_rdd = fb_file.rdd\
.mapPartitions(fb_func)\
.toDF(fb_schema)\
.repartition(2)\
.collect()
这是我回来的错误:
Py4JJavaError: An error occurred while calling o98.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 29, ip-185-16-116-233.ec2.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/worker.py", line 217, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/worker.py", line 59, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
return pickle.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/cloudpickle.py", line 929, in subimport
__import__(name)
ImportError: ('No module named facebook', <function subimport at 0x7feea89b26e0>, ('facebook',))
我确保在所有实例上都安装了FB库,所以我不知道为什么我会收到这个错误...如果有人知道为什么,我真的很感激任何帮助!