首页 文章

从pyspark工作人员访问HDFS / Hadoop api

提问于
浏览
-1

我需要从pyspark worker 中读取/扫描/写入hdfs文件 .

请注意,以下api不适用,因为它们用于 driver

sc.textFile()
sc.saveAsParquetFile()

等等

最好不要涉及其他第三方库(例如pyhadoop) .

一种选择是掏出例如

os.system('hdfs dfs -ls %(hdfsPath)s' %locals())

但有没有更原生的pyspark方式来实现这一目标?

UPDATE 这不是广播数据的情况,因为每个工作人员将从hdfs读取不同的数据 . 其中一个用例是在每个worker中读取一些大的二进制文件(显然不是广播的情况) . 另一种情况是阅读包含指令的"command"文件 . 我已经在原生hadoop和scala spark中成功使用了这个模式 .

2 回答

  • 1

    解决方案似乎是子进程(没有直接的python访问) . 拼凑接受的答案和其中一条评论:Python read file as stream from HDFS

    cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
    for line in iter(cat.stdout.readline, ''): 
        print line,   # include the comma
    
  • -1

    更原生的PySpark方法是使用 sc.textFile() 或其他读取方法读取驱动程序中的数据,并将其作为RDD或广播变量传递给worker,如果它足够小以适应每个执行程序的内存 .

    你能描述一下你的情况,我怀疑你真的需要阅读 Worker 的文件

    更新:

    简短的摘要:

    • 直接从大型集群上的worker中读取文件集可能会终止namenode

    • 在大多数情况下,直接从 Worker 那里读取单独的文件并不是必需的 . 您可以只为通配符 textFile() 方法的文件集或使用 wholeTextFiles()binaryFiles() 方法读取文件集及其名称

    • 在具有千兆字节图像的图像处理的特定情况下,只需将它们放入序列文件并使用 sequenceFile() 方法读取它

    • 可以通过直接查询WebHDFS REST API来实现直接从HSFS使用Python直接读取而无需额外的库,这是一种过度杀伤,因为这正是库实现的 . 另一种选择可能是使用 pipe() Spark方法调用Java程序读取HDFS文件并将它们以序列化形式返回到stdout . 另一种选择是通过转义到shell将文件从HDFS复制到临时空间,然后使用标准读取文件功能读取此文件 . 就个人而言,我会解雇我的开发人员来实现我在这里提出的任何方法

相关问题