我目前正在基于ID列表生成DataFrames - 每个基于一个ID的查询都会返回一个非常大的PostgreSQL表的可管理子集 . 然后我根据我需要写出的文件结构对输出进行分区 . 问题是我正在达到速度限制并且主要利用我的 Actuator 资源 .
我不确定这是否需要重新考虑我的架构,或者是否有一些简单的方法可以解决这个问题,但基本上我希望得到更多的任务并行化但是我没有让所有16位执行者都忙着尽快做这个ETL工作 .
所以...这就是我认为我可以做的事情来加速这个:
-
并行化列表 .
-
然后,列表中的每个元素,在执行程序上,通过jdbc选择(相对较小的)DataFrame .
-
然后foreachPartition(其中必然很少),我需要做一些动作(包括来自每个分区的原子数据写入),并且这些分区动作也可以分支到工作节点/ Actuator .
当前代码看起来像这样,但当然抛出“py4j.Py4JException:方法 getnewargs ([])不存在”因为火花会话上下文不能传递到foreach闭包中,这将允许它保留在执行程序上:
spark = SparkSession \
.builder \
.appName
... etc
# the list, distributed to workers
idsAndRegionsToProcess = sc.parallelize(idList)
# the final thing that needs to be done with the data
# (each partition written to a file and sent somewhere)
def transformAndLoad(iterator, someField, someOtherField):
for row in iterator:
...do stuff
...write a file to S3
# !! The issue is here (well, at least with my current approach)!!
# In theory these are the first operations that really need to be
# running on various nodes.
def dataMove(idAndRegion, spark):
# now pull dataFrames from Postgres foreach id
postgresDF = spark.read \
.format("jdbc") \
.option("url" …
.option("dbtable", "(select id, someField, someOtherField from table_region_“ + idAndRegion[1] + ” where id = ‘“ + idAndRegion[0] + ”') as \history") \
… more setup
postgresDF.repartition('someOtherField')
postgresDF.persist(StorageLevel.MEMORY_AND_DISK)
postgresDF.foreachPartition(lambda iterator: transformAndLoad(iterator, someField, someOtherField))
# invoking the problematic code on the parallelized list
idsAndRegionsToProcess.foreach(lambda idAndRegion: dataMove(idAndRegion, spark))
我知道这种方式不太可能,但也许我错过了一个可以使这成为可能的微妙之处?这似乎比选择1TB数据然后处理它更有效,但也许有一些我不知道的潜在分页 .
我有一个非常相似的工作代码,在收集的列表上使用几乎这个确切的代码操作常规循环,但这很痛苦,并且不会接近使用执行程序 .
对于一些额外的上下文,我在EMR和YARN上,我的spark-submit(来自主节点)看起来像这样:spark-submit --packages org.postgresql:postgresql:9.4.1207.jre7 --deploy-mode cluster - -num-executors 16 --executor-memory 3g --master yarn DataMove.py
此外,选择这些DataFrames不成问题,因为结果是数据的一小部分并且数据库被正确索引,但是选择每个整个表似乎是绝对不可能的,因为在某些数据库中可能存在高达TB的数据 . 他们 . 此外,重新分区除以需要写入到s3的每个(单个和特定命名的)文件中的内容 .
我会接受任何建议,即使它只是意味着使用我的工作代码,并以某种方式让它开始尽可能多的工作,而其他的东西仍在运行 . 但首先,我的方法可以在这里起作用吗?