- 我正在使用Kafka Spark Streaming来获取流数据 .
val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
- 我正在使用这个DStream并处理RDD
val output = lines.foreachRDD(rdd =>
rdd.foreachPartition { partition =>
partition.foreach { file => runConfigParser(file)}
})
runConfigParser
是一个JAVA方法,它解析文件并生成一个我必须保存在HDFS中的输出 . 因此,多个节点将处理RDD并将输出写入单个HDFS文件 . 因为我想在HIVE中加载这个fie .
我应该输出 runConfigParser
的结果并使用 sc.parallze(output).saveAsTextFile(path)
这样我的所有节点都会将RDD输出写入单个HDFS文件 . 这种设计有效吗?
我将在HIVE中加载这个单独的HDFS文件(将不断更新为其流数据)并使用Impala进行查询 .
2 回答
不是 . 因为您需要一个HDFS文件
saveAsTextFile
,它为RDD分区创建了许多HDFS文件,所以不能满足您的要求 .为了获得一个HDFS文件,输出
reduce
/collect
并调用HDFS Java API来创建HDFS文件 . 这种方法效率很低,因为所有输出都需要在最后一次Spark动作时来到Spark驱动程序 .您可以使用函数"merge"
saveAsTextFile
的结果 . 像这样: