我有点火花,我正在使用PySpark来运行我使用Facebook登录连接到我的应用程序的用户列表 .

群集已安装Spark 2.3.1 . 5个实例高手,每一个我都安装了“facebook-sdk”库 . 当我运行单个命令时,我从facebook GraphAPI获得响应,我可以将它用于我需要的任何内容 . 问题是当我在函数内部使用GraphAPI命令时,尝试使用mapPartitions在其上运行 .

这是我的代码:

import facebook
import pyspark
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

def fb_func(rows):
    fields='id,first_name,middle_name,last_name,gender,birthday'
    ret = []
    for row in rows:
        token = row[6]
        id = row[7]
        graph = facebook.GraphAPI(access_token=token, version=3.0)
        user = graph.get_object(id=id, fields=fields)

        ret.append(str(id) + "," + str(token) + "," + str(user['first_name']) + "," + str(user['middle_name']) + "," + str(user['last_name']) + "," + str(user['gender']) + "," + str(user['birthday']))

    return ret

file = spark.read.parquet("s3://parquet-files/fb_data/*")
fb_file = file.repartition(5)

fb_schema = StructType([
    StructField('user_id', LongType(), True), 
    StructField('token', StringType(), True), 
    StructField('first_name', StringType(), True), 
    StructField('middle_name', StringType(), True), 
    StructField('last_name', StringType(), True), 
    StructField('gender', StringType(), True), 
    StructField('birthday', StringType(), True)
])

fb_rdd = fb_file.rdd\
    .mapPartitions(fb_func)\
    .toDF(fb_schema)\
    .repartition(2)\
    .collect()

这是我回来的错误:

Py4JJavaError: An error occurred while calling o98.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 29, ip-185-16-116-233.ec2.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/worker.py", line 217, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/worker.py", line 59, in read_command
    command = serializer._read_with_length(file)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/cloudpickle.py", line 929, in subimport
    __import__(name)
ImportError: ('No module named facebook', <function subimport at 0x7feea89b26e0>, ('facebook',))

我确保在所有实例上都安装了FB库,所以我不知道为什么我会收到这个错误...如果有人知道为什么,我真的很感激任何帮助!