我试图测试PySpark迭代一些非常大(10s的GB到1s的TB)数据的能力 . 对于大多数脚本,我发现PySpark具有与Scala代码大致相同的效率 . 在其他情况下(如下面的代码),我会遇到严重的速度问题,速度要慢10到12倍 .
path = "path/to/file"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext
df = RecordLoader.loadSomethingAsDF(path, sc, spark)
fdf = df.select(df['aDate'], df['aSourceUrl'], df['contentTextWithUrls'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.aDate, CreateAVertexFromSourceUrlAndContent(r.aSourceUrl, r.contentTextWithUrls)))\
.flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))\
.filter(lambda r: r[-1] != None)\
.countByValue()
print([((x[0], x[1], x[2]), y) for x, y in rddx.items()])
我们认为我们已将问题隔离到.countByValue()(返回defaultdict),但应用countItems()或reduceByKey()会产生几乎相同的结果 . 我们也99%确定问题不在于ExtractDomain或CreateAVertexFromSourceUrlAndContent(不是函数的真实名称,只是伪代码使其可理解) .
所以我的问题是第一个
-
这段代码中有什么我可以做的来减少时间吗?
-
PySpark基本上比Scala对手慢得多吗?
-
有没有办法使用PySpark数据帧复制flatmap(理解数据帧通常比Pyspark中的RDD更快)?
1 回答
这里最大的问题可能是通信 - Spark SQL(列式格式) - >普通的Scala对象 - > pickle(Pyrolite) - > socket - > unpickle - >普通的Python对象 . 这是很多复制,转换和移动的东西 .
是 . 它被称为explode - 但公平地说它也很慢 .
这通常是正确的(Scala和Python都是),但是你可能需要
udf
来实现ExtractDomain
或CreateAVertexFromSourceUrlAndContent
- 这是另一个慢的事情 . 只是从您可能使用的名称parse_url_tuple .它有点慢 . 通常情况下,调优良好的代码不会那么慢 . 但是实现细节是不同的 - Scala和Python中的相同操作集可以以不同的方式实现 .
我建议先进行分析 . 一旦确定哪个部分负责(转换,合并),您就可以尝试定位它 .