当我在EMR上运行我的spark作业(版本2.1.1)时,每次运行会在数据帧上计算不同的行数 . 我首先从s3读取数据到4个不同的数据帧,这些计数总是一致的,然后在加入数据帧之后,连接的结果具有不同的计数 . 之后我也会过滤结果,每次运行时也有不同的计数 . 变化很小,1-5行差异,但它仍然是我想要了解的东西 .
这是加入的代码:
val impJoinKey = Seq("iid", "globalVisitorKey", "date")
val impressionsJoined: DataFrame = impressionDsNoDuplicates
.join(realUrlDSwithDatenoDuplicates, impJoinKey, "outer")
.join(impressionParamterDSwithDateNoDuplicates, impJoinKey, "left")
.join(chartSiteInstance, impJoinKey, "left")
.withColumn("timestamp", coalesce($"timestampImp", $"timestampReal", $"timestampParam"))
.withColumn("url", coalesce($"realUrl", $"url"))
这是过滤器:
val impressionsJoined: Dataset[ImpressionJoined] = impressionsJoinedFullDay.where($"timestamp".geq(new Timestamp(start.getMillis))).cache()
我也尝试过使用filter方法而不是where,但结果相同
任何想法?
谢谢你
1 回答
是否有可能其中一个数据源随时间变化?
由于
impressionsJoined
未被缓存,因此spark会在每个操作上从头开始重新评估它,包括再次从源中读取数据 .尝试在加入后缓存
impressionsJoined
.