首页 文章

Spark Scala - textFile()和sequenceFile()RDD

提问于
浏览
0

我正在成功地将我的序列文件加载到DataFrame中,其代码如下:

val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sc.sequenceFile[LongWritable,String](src)
val jsonRecs = file.map((record: (String, String)) => new String(record._2))
val df = sqlContext.read.json(jsonRecs)

我想对一些文本文件做同样的事情 . 文本文件的格式与序列文件类似(时间戳,制表符char,然后是json) . 但问题是textFile()返回RDD [String]而不是像sequenceFile()方法那样返回RDD [LongWritable,String] .

我的目标是能够使用序列文件或文本文件作为输入来测试程序 .

我如何将来自textFile()的RDD [String]转换为RDD [LongWritable,String]?或者有更好的解决方案吗?

2 回答

  • 0

    假设您的文本文件是CSV文件,您可以使用以下代码读取Dataframe中的CSV文件,其中 spark 是SparkSession:

    val df = spark.read.option("header", "false").csv("file.txt")
    

    与 Headers 选项一样,您可以根据需要提供多种选项 . 检查this了解更多详情 .

  • 1

    谢谢你的回复 . 它不是CSV,但我猜它可能是 . 它只是在HDFS中的序列文件上执行此操作的文本输出:

    hdfs dfs -text /path/to/my/file > myFile.txt
    

    无论如何,我找到了一个适用于我的用例的序列和文本文件的解决方案 . 在这两种情况下,此代码最终将变量'file'设置为RDD [String,String],我可以使用它 .

    var file = if (inputType.equalsIgnoreCase("text")) {
          sc.textFile(src).map(line => (line.split("\t")(0), line.split("\t")(1)))
    } else { // Default to assuming sequence files are input
          sc.sequenceFile[String,String](src)
    }
    

相关问题