我目前正在基于ID列表生成DataFrames - 每个基于一个ID的查询都会返回一个非常大的PostgreSQL表的可管理子集 . 然后我根据我需要写出的文件结构对输出进行分区 . 问题是我正在达到速度限制并且主要利用我的 Actuator 资源 .

我不确定这是否需要重新考虑我的架构,或者是否有一些简单的方法可以解决这个问题,但基本上我希望得到更多的任务并行化但是我没有让所有16位执行者都忙着尽快做这个ETL工作 .

所以...这就是我认为我可以做的事情来加速这个:

  • 并行化列表 .

  • 然后,列表中的每个元素,在执行程序上,通过jdbc选择(相对较小的)DataFrame .

  • 然后foreachPartition(其中必然很少),我需要做一些动作(包括来自每个分区的原子数据写入),并且这些分区动作也可以分支到工作节点/ Actuator .

当前代码看起来像这样,但当然抛出“py4j.Py4JException:方法 getnewargs ([])不存在”因为火花会话上下文不能传递到foreach闭包中,这将允许它保留在执行程序上:

spark = SparkSession \
    .builder \
    .appName
    ... etc

# the list, distributed to workers
idsAndRegionsToProcess = sc.parallelize(idList)

# the final thing that needs to be done with the data
# (each partition written to a file and sent somewhere)
def transformAndLoad(iterator, someField, someOtherField):
    for row in iterator:
        ...do stuff
    ...write a file to S3

# !! The issue is here (well, at least with my current approach)!!
# In theory these are the first operations that really need to be
# running on various nodes.
def dataMove(idAndRegion, spark):
        # now pull dataFrames from Postgres foreach id
        postgresDF = spark.read \
            .format("jdbc") \
            .option("url" …
        .option("dbtable", "(select id, someField, someOtherField from table_region_“ + idAndRegion[1] + ” where id = ‘“ + idAndRegion[0] + ”') as \history") \
        … more setup        

    postgresDF.repartition('someOtherField')
    postgresDF.persist(StorageLevel.MEMORY_AND_DISK)
    postgresDF.foreachPartition(lambda iterator: transformAndLoad(iterator, someField, someOtherField))

# invoking the problematic code on the parallelized list
idsAndRegionsToProcess.foreach(lambda idAndRegion: dataMove(idAndRegion, spark))

我知道这种方式不太可能,但也许我错过了一个可以使这成为可能的微妙之处?这似乎比选择1TB数据然后处理它更有效,但也许有一些我不知道的潜在分页 .

我有一个非常相似的工作代码,在收集的列表上使用几乎这个确切的代码操作常规循环,但这很痛苦,并且不会接近使用执行程序 .

对于一些额外的上下文,我在EMR和YARN上,我的spark-submit(来自主节点)看起来像这样:spark-submit --packages org.postgresql:postgresql:9.4.1207.jre7 --deploy-mode cluster - -num-executors 16 --executor-memory 3g --master yarn DataMove.py

此外,选择这些DataFrames不成问题,因为结果是数据的一小部分并且数据库被正确索引,但是选择每个整个表似乎是绝对不可能的,因为在某些数据库中可能存在高达TB的数据 . 他们 . 此外,重新分区除以需要写入到s3的每个(单个和特定命名的)文件中的内容 .

我会接受任何建议,即使它只是意味着使用我的工作代码,并以某种方式让它开始尽可能多的工作,而其他的东西仍在运行 . 但首先,我的方法可以在这里起作用吗?