首页 文章

使用spark进行多处理(PySpark)[重复]

提问于
浏览
5

这个问题在这里已有答案:

用例如下:

我有一个大型数据框,其中包含一个'user_id'列(每个user_id可以出现在很多行中) . 我有一个用户列表 my_users 我需要分析 .

Groupbyfilteraggregate 可能是一个好主意,但pyspark中包含的可用聚合函数不符合我的需要 . 在pyspark ver中, user defined aggregation functions 仍未得到完全支持,我现在决定离开它 .

相反,我只是迭代 my_users 列表,过滤数据帧中的每个用户,然后进行分析 . 为了优化这个过程,我决定为 my_users 中的每个用户使用 python multiprocessing pool

执行分析(并传递给池)的函数有两个参数: user_idpath to the main dataframe ,我在其上执行所有计算( PARQUET 格式) . 在方法中,我加载数据帧,并对其进行处理(DataFrame不能作为参数本身传递)

我得到各种奇怪的错误,在一些进程(每次运行中不同),看起来像:

  • PythonUtils does not exist in the JVM (读取'parquet'数据帧时)

print screen of the error message

  • KeyError: 'c' not found (同样,在阅读'parquet'数据帧时 . 无论如何'c'是什么?)

当我在没有任何多处理的情况下运行它时,一切都运行顺畅,但速度很慢..

这些错误来自哪里?

我会提供一些代码示例,以使事情更清晰:

PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant

# ....

def users_worker(df_path, user_id):
    df = spark.read.parquet(df_path) # The problem is here!
    ## the analysis of user_id in df is here

def user_worker_wrapper(args):
    users_worker(*args)

def analyse():
    # ...
    users_worker_args = [(df_path, user_id) for user_id in my_users]
    users_pool = Pool(processes=len(my_users))
    users_pool.map(users_worker_wrapper, users_worker_args)
    users_pool.close()
    users_pool.join()

1 回答

  • 6

    实际上,正如@ user6910411评论的那样,当我将Pool更改为threadPool(multiprocessing.pool.ThreadPool包)时,一切都按预期工作,这些错误消失了 .

    错误本身的根本原因现在也很清楚,如果您希望我分享它们,请在下面发表评论 .

相关问题