首页 文章

处理Spark Streaming rdd并存储到单个HDFS文件

提问于
浏览
2
  • 我正在使用Kafka Spark Streaming来获取流数据 .
val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
  • 我正在使用这个DStream并处理RDD
val output = lines.foreachRDD(rdd => 
        rdd.foreachPartition { partition => 
            partition.foreach { file => runConfigParser(file)}
})
  • runConfigParser 是一个JAVA方法,它解析文件并生成一个我必须保存在HDFS中的输出 . 因此,多个节点将处理RDD并将输出写入单个HDFS文件 . 因为我想在HIVE中加载这个fie .

我应该输出 runConfigParser 的结果并使用 sc.parallze(output).saveAsTextFile(path) 这样我的所有节点都会将RDD输出写入单个HDFS文件 . 这种设计有效吗?

我将在HIVE中加载这个单独的HDFS文件(将不断更新为其流数据)并使用Impala进行查询 .

2 回答

  • 1

    不是 . 因为您需要一个HDFS文件 saveAsTextFile ,它为RDD分区创建了许多HDFS文件,所以不能满足您的要求 .

    为了获得一个HDFS文件,输出 reduce / collect 并调用HDFS Java API来创建HDFS文件 . 这种方法效率很低,因为所有输出都需要在最后一次Spark动作时来到Spark驱动程序 .

  • 1

    您可以使用函数"merge" saveAsTextFile 的结果 . 像这样:

    import org.apache.hadoop.fs._
    
    def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
        val sourceFile = hdfsServer + "/tmp/" 
        rdd.saveAsTextFile(sourceFile)
        val dstPath = hdfsServer + "/final/" 
        merge(sourceFile, dstPath, fileName)
      }
    
      def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
        val hadoopConfig = new Configuration()
        val hdfs = FileSystem.get(hadoopConfig)
        val destinationPath = new Path(dstPath)
        if (!hdfs.exists(destinationPath)) {
          hdfs.mkdirs(destinationPath)
        }
        FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
      }
    

相关问题