我正在尝试基于 accountId
对输入文件进行分区 . 但是,仅当dataFrames包含超过1000条记录时,才会执行此分区 . accountId
是一个动态整数,不可能是未知的 . 请考虑以下代码
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val df = sqlContext.read.json(rdd)
val filteredDF = df.filter(df("accountId")==="3")
if (filteredDF.count() > 1000) {
df.write.partitionBy("accountId").format("json").save("output")
}
}
}
ssc.start()
ssc.awaitTermination()
但是上面的代码分区了所有不需要的accountId .
-
我想在数据帧中找到每个
accountId
的计数 . -
如果每个accountId的记录超过1000,则将分区信息写入输出源 .
例如,如果输入文件具有1500个accountId = 1的记录和10个accountId = 2的记录,则将基于accountId = 1的过滤数据帧分区为输出源,并将memIory中的accountId = 2个记录分区 .
如何使用spark-streaming实现这一目标?
1 回答
你应该这样做吗?
代替