首页 文章

Spark Streaming:HDFS

提问于
浏览
3
  • 我无法让我的Spark作业从HDFS流式传输"old"文件 .

如果我的Spark作业由于某种原因(例如演示,部署)而关闭,但是写入/移动到HDFS目录是连续的,我可能会在启动Spark Streaming Job后跳过这些文件 .

val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")

    hdfsDStream.foreachRDD(
      rdd => logInfo("Number of records in this batch: " + rdd.count())
    )

输出 - >此批次中的记录数:0

  • Spark Streaming有没有办法将“读取”文件移动到另一个文件夹?或者我们必须手动编程?因此,它将避免读取已经“读取”的文件 .

  • Spark Streaming与在CRON中运行spark job(sc.textFile)相同吗?

3 回答

  • 3

    您是否期望Spark读取目录中已有的文件?如果是这样,这是一种常见的误解,令我感到意外 . textFileStream 监视目录以显示新文件,然后读取它们 . 当您启动或已经读取文件时,它会忽略目录中已有的文件 .

    理由是你希望Spark能够阅读它们 . 请注意,这些文件很多是以原子方式显示的,例如,它们在其他地方慢慢写入,然后移动到监视目录 . 这是因为HDFS无法正确处理同时读写文件 .

  • 0

    正如Dean所提到的,textFileStream使用的默认值仅使用新文件 .

    def textFileStream(directory: String): DStream[String] = {
        fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
      }
    

    所以,它正在做的就是调用 fileStream 的这个变体

    def fileStream[
        K: ClassTag,
        V: ClassTag,
        F <: NewInputFormat[K, V]: ClassTag
      ] (directory: String): InputDStream[(K, V)] = {
        new FileInputDStream[K, V, F](this, directory)
      }
    

    并且,查看 FileInputDStream 类,我们将看到它确实可以查找现有文件,但默认为仅新建:

    newFilesOnly: Boolean = true,
    

    所以,回到 StreamingContext 代码,我们可以看到直接调用 fileStream 方法可以使用和重载:

    def fileStream[
     K: ClassTag,
     V: ClassTag,
     F <: NewInputFormat[K, V]: ClassTag] 
    (directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
      new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
    }
    

    所以,TL; DR;是

    ssc.fileStream[LongWritable, Text, TextInputFormat]
        (directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
    
  • 7
    val filterF = new Function[Path, Boolean] {
        def apply(x: Path): Boolean = {
          println("looking if "+x+" to be consider or not")
          val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
           else{ false }
          return flag
        }
    }
    

    此过滤器函数用于确定每个路径是否实际上是您首选的路径 . 因此,应根据您的要求定制apply中的功能 .

    val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}
    

    现在你必须将filestream函数的第三个变量设置为false,这不仅要确保新文件,还要考虑流式目录中的旧现有文件 .

相关问题