首页 文章

有没有办法提高PySpark输出的效率?

提问于
浏览
3

我试图测试PySpark迭代一些非常大(10s的GB到1s的TB)数据的能力 . 对于大多数脚本,我发现PySpark具有与Scala代码大致相同的效率 . 在其他情况下(如下面的代码),我会遇到严重的速度问题,速度要慢10到12倍 .

path = "path/to/file"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext   

df = RecordLoader.loadSomethingAsDF(path, sc, spark)
fdf = df.select(df['aDate'], df['aSourceUrl'], df['contentTextWithUrls'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.aDate, CreateAVertexFromSourceUrlAndContent(r.aSourceUrl, r.contentTextWithUrls)))\
 .flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))\
 .filter(lambda r: r[-1] != None)\
 .countByValue()

print([((x[0], x[1], x[2]), y) for x, y in rddx.items()])

我们认为我们已将问题隔离到.countByValue()(返回defaultdict),但应用countItems()或reduceByKey()会产生几乎相同的结果 . 我们也99%确定问题不在于ExtractDomain或CreateAVertexFromSourceUrlAndContent(不是函数的真实名称,只是伪代码使其可理解) .

所以我的问题是第一个

  • 这段代码中有什么我可以做的来减少时间吗?

  • PySpark基本上比Scala对手慢得多吗?

  • 有没有办法使用PySpark数据帧复制flatmap(理解数据帧通常比Pyspark中的RDD更快)?

1 回答

  • 2

    这里最大的问题可能是通信 - Spark SQL(列式格式) - >普通的Scala对象 - > pickle(Pyrolite) - > socket - > unpickle - >普通的Python对象 . 这是很多复制,转换和移动的东西 .

    有一种方法可以使用PySpark数据帧复制flatmap

    是 . 它被称为explode - 但公平地说它也很慢 .

    了解数据框通常比Pyspark中的RDD更快

    这通常是正确的(Scala和Python都是),但是你可能需要 udf 来实现 ExtractDomainCreateAVertexFromSourceUrlAndContent - 这是另一个慢的事情 . 只是从您可能使用的名称parse_url_tuple .

    PySpark基本上比Scala对手慢得多吗?

    它有点慢 . 通常情况下,调优良好的代码不会那么慢 . 但是实现细节是不同的 - Scala和Python中的相同操作集可以以不同的方式实现 .

    这段代码中有什么我可以做的来减少时间吗?

    我建议先进行分析 . 一旦确定哪个部分负责(转换,合并),您就可以尝试定位它 .

相关问题