- 我无法让我的Spark作业从HDFS流式传输"old"文件 .
如果我的Spark作业由于某种原因(例如演示,部署)而关闭,但是写入/移动到HDFS目录是连续的,我可能会在启动Spark Streaming Job后跳过这些文件 .
val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")
hdfsDStream.foreachRDD(
rdd => logInfo("Number of records in this batch: " + rdd.count())
)
输出 - >此批次中的记录数:0
-
Spark Streaming有没有办法将“读取”文件移动到另一个文件夹?或者我们必须手动编程?因此,它将避免读取已经“读取”的文件 .
-
Spark Streaming与在CRON中运行spark job(sc.textFile)相同吗?
3 回答
您是否期望Spark读取目录中已有的文件?如果是这样,这是一种常见的误解,令我感到意外 .
textFileStream
监视目录以显示新文件,然后读取它们 . 当您启动或已经读取文件时,它会忽略目录中已有的文件 .理由是你希望Spark能够阅读它们 . 请注意,这些文件很多是以原子方式显示的,例如,它们在其他地方慢慢写入,然后移动到监视目录 . 这是因为HDFS无法正确处理同时读写文件 .
正如Dean所提到的,textFileStream使用的默认值仅使用新文件 .
所以,它正在做的就是调用
fileStream
的这个变体并且,查看
FileInputDStream
类,我们将看到它确实可以查找现有文件,但默认为仅新建:所以,回到
StreamingContext
代码,我们可以看到直接调用fileStream
方法可以使用和重载:所以,TL; DR;是
此过滤器函数用于确定每个路径是否实际上是您首选的路径 . 因此,应根据您的要求定制apply中的功能 .
现在你必须将filestream函数的第三个变量设置为false,这不仅要确保新文件,还要考虑流式目录中的旧现有文件 .