首页 文章

如何基于dataFram中的行计数为列值执行动态分区

提问于
浏览
0

我正在尝试基于 accountId 对输入文件进行分区 . 但是,仅当dataFrames包含超过1000条记录时,才会执行此分区 . accountId 是一个动态整数,不可能是未知的 . 请考虑以下代码

val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()

lines.foreachRDD { rdd =>
  val count = rdd.count()
  if (count > 0) {
    val df = sqlContext.read.json(rdd)
    val filteredDF = df.filter(df("accountId")==="3")
    if (filteredDF.count() > 1000) {
      df.write.partitionBy("accountId").format("json").save("output")
    }
  }
}

ssc.start()
ssc.awaitTermination()

但是上面的代码分区了所有不需要的accountId .

  • 我想在数据帧中找到每个 accountId 的计数 .

  • 如果每个accountId的记录超过1000,则将分区信息写入输出源 .

例如,如果输入文件具有1500个accountId = 1的记录和10个accountId = 2的记录,则将基于accountId = 1的过滤数据帧分区为输出源,并将memIory中的accountId = 2个记录分区 .

如何使用spark-streaming实现这一目标?

1 回答

  • 1

    你应该这样做吗?

    filteredDF.write.partitionBy("accountId").format("json").save("output")
    

    代替

    df.write.partitionBy("accountId").format("json").save("output")
    

相关问题