首页 文章

在spark数据帧上应用操作时出错

提问于
浏览
0

我是Spark框架的新手,并在我的本地机器上处理一些(〜)小任务来练习 . 我的任务如下:我在S3中存储了365个压缩的csv文件,其中包含每日日志 . 我想构建一整年的数据框架 . 我的方法是从存储桶中检索密钥,构建每日数据帧,将它们统一到月份数据帧中,为它们执行相同操作,并获得一整年的数据帧作为回报 .

这对我检索的一些样本数据起了作用 . 在构建DataFrame之前,我解压缩了文件,将未压缩的csv文件写入磁盘,并使用它来创建DataFrame .

问题:如果我从磁盘中删除csv文件(使其成为临时文件),在创建数据框后,我无法对数据框执行任何操作(例如year_df.count()) . 抛出Spark.exception:

“由于阶段失败导致作业中止:.... java.io.FileNotFoundException:文件.csv不存在”

经过对SO的一些搜索之后,我发现原因可能是Spark在DataFrames上应用SQL查询时使用的MetaData(External Table not getting updated from parquet files written by spark streaming) . 我改变了

spark.sql.parquet.cacheMetadata

通过运行 spark = SparkSession.builder.config("spark.sql.parquet.cacheMetadata", "false").getOrCreate() . 确保 spark.conf.get("spark.sql.parquet.cacheMetadata") 返回false ..

找不到任何解决方案 . 当然,将所有文件解压缩到S3都可以,但这对我没用 .

谢谢!

2 回答

  • 0

    Spark以懒惰的方式执行操作 .

    这意味着,如果您可以进行少量转换,但只有在您调用操作时才会读取文件 .

    它在数据集上的工作方式与在RDD中的工作方式相同,因为数据集由RDD支持

    考虑代码:

    val df = sqlContext.read // read file
    val query = df.groupBy('x).count()
    
    query.show() // here the data will be read
    

    因此,如果您在读取之前删除文件,那么Spark将抛出异常 . 您可以通过执行某些操作来强制阅读,即 takeshow . 如果你这样做,它会被缓存 cache()

    val df = sqlContext.read // read file
    val query = df.groupBy('x).count().cache()
    
    query.show() // here the data will be read and cached
    
  • 0

    Cache()仍然只是一个提示; Spark可能需要在出现故障时重新计算值,或者只是由于缓存压力而丢弃缓存的数据 . 如果要删除源数据,请确保已将结果写出来,并且实际上不再需要数据 .

    我实际上建议将CSV移到任何柱状格式(ORC,Parquet)并使用Snappy进行压缩 . 处理效率更高,特别是在谓词下推时

相关问题