首页 文章

在火花流式上下文中将RDD写入HDFS

提问于
浏览
7

我有一个火花流媒体环境与火花1.2.0,我从本地文件夹中检索数据,每次我发现一个新文件添加到文件夹,我执行一些转换 .

val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)

为了对DStream数据执行分析,我必须将其转换为数组

var arr = new ArrayBuffer[String]();
   data.foreachRDD {
   arr ++= _.collect()
}

然后我使用获得的数据来提取我想要的信息并将它们保存在HDFS上 .

val myRDD  = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")

由于我真的需要使用数组操作数据,因此不可能使用 DStream.saveAsTextFiles("...") (这可以正常工作)在HDFS上保存数据而且我必须保存RDD但是通过这种预处理,我终于有了名为part-00000等的空输出文件...

使用 arr.foreach(println) ,我能够看到转换的正确结果 .

我的怀疑是,spark会尝试在每个批处理中将数据写入相同的文件,删除以前写的内容 . 我试图保存在动态命名文件夹中,如 myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) ,但始终只创建一个folds,输出文件仍为空 .

如何在Spark-streaming上下文中将RDD写入HDFS?

2 回答

  • 2

    您正在以未设计的方式使用Spark Streaming . 我建议您使用Spark作为用例,或者调整代码以使其适用于Spark方式 . 将阵列收集到驱动程序会破坏使用分布式引擎的目的,并使您的应用程序有效地实现单机(两台机器也会比仅在一台机器上处理数据产生更多的开销) .

    你可以用数组做的一切,你可以用Spark做 . 因此,只需在流中运行计算,分布在worker上,然后使用 DStream.saveAsTextFiles() 编写输出 . 您可以使用 foreachRDD saveAsParquet(path, overwrite = true) 写入单个文件 .

  • 6

    @vzamboni:Spark 1.5 dataframes api具有以下功能:

    dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path);
    

相关问题