首页 文章

Spark DataFrame行数在运行之间不一致

提问于
浏览
1

当我在EMR上运行我的spark作业(版本2.1.1)时,每次运行会在数据帧上计算不同的行数 . 我首先从s3读取数据到4个不同的数据帧,这些计数总是一致的,然后在加入数据帧之后,连接的结果具有不同的计数 . 之后我也会过滤结果,每次运行时也有不同的计数 . 变化很小,1-5行差异,但它仍然是我想要了解的东西 .

这是加入的代码:

val impJoinKey = Seq("iid", "globalVisitorKey", "date")

val impressionsJoined: DataFrame = impressionDsNoDuplicates
  .join(realUrlDSwithDatenoDuplicates, impJoinKey, "outer")
  .join(impressionParamterDSwithDateNoDuplicates, impJoinKey, "left")
  .join(chartSiteInstance, impJoinKey, "left")
  .withColumn("timestamp", coalesce($"timestampImp", $"timestampReal", $"timestampParam"))
  .withColumn("url", coalesce($"realUrl", $"url"))

这是过滤器:

val impressionsJoined: Dataset[ImpressionJoined] = impressionsJoinedFullDay.where($"timestamp".geq(new Timestamp(start.getMillis))).cache()

我也尝试过使用filter方法而不是where,但结果相同

任何想法?

谢谢你

1 回答

  • 0

    是否有可能其中一个数据源随时间变化?
    由于 impressionsJoined 未被缓存,因此spark会在每个操作上从头开始重新评估它,包括再次从源中读取数据 .

    尝试在加入后缓存 impressionsJoined .

相关问题