首页 文章

如何用pyspark跳过损坏的gzips?

提问于
浏览
0

我需要从hdfs中读取很多gzip,比如:sc.textFile('* .gz'),而其中一些gzips已损坏,引发

java.io.IOException:gzip stream CRC失败

停止整个处理运行 .

我读了辩论here,有人有同样的需要,但没有明确的解决方案 . 因为在spark中实现这个功能是不合适的(根据链接),有没有什么方法可以粗略地跳过损坏的文件?似乎有scala用户的提示,不知道如何在python中处理它 .

或者我只能先检测损坏的文件,然后删除它们?

如果我有大量的gzip怎么办?经过一天的运行后,找出它们中的最后一个已损坏 . 整整一天都浪费了 . 有损坏的gzips很常见 .

1 回答

  • 0

    您可以手动列出所有文件,然后读取 Map UDF中的文件 . 然后,UDF可以使用try / except块来处理损坏的文件 .

    代码看起来像

    import gzip
    from pyspark.sql import Row
    
    def readGzips(fileLoc):
        try:
            ...
            code to read file
            ...
            return record
        except:
            return Row(failed=fileLoc)
    
    from os import listdir
    from os.path import isfile, join
    fileList = [f for f in listdir(mypath) if isfile(join(mypath, f))]
    
    pFileList = sc.parallelize(fileList)
    dataRdd = pFileList.map(readGzips).filter((lambda x: 'failed' not in x.asDict()))
    

相关问题