我需要从pyspark worker 中读取/扫描/写入hdfs文件 .
请注意,以下api不适用,因为它们用于 driver :
sc.textFile()
sc.saveAsParquetFile()
等等
最好不要涉及其他第三方库(例如pyhadoop) .
一种选择是掏出例如
os.system('hdfs dfs -ls %(hdfsPath)s' %locals())
但有没有更原生的pyspark方式来实现这一目标?
UPDATE 这不是广播数据的情况,因为每个工作人员将从hdfs读取不同的数据 . 其中一个用例是在每个worker中读取一些大的二进制文件(显然不是广播的情况) . 另一种情况是阅读包含指令的"command"文件 . 我已经在原生hadoop和scala spark中成功使用了这个模式 .
2 回答
解决方案似乎是子进程(没有直接的python访问) . 拼凑接受的答案和其中一条评论:Python read file as stream from HDFS
更原生的PySpark方法是使用
sc.textFile()
或其他读取方法读取驱动程序中的数据,并将其作为RDD或广播变量传递给worker,如果它足够小以适应每个执行程序的内存 .你能描述一下你的情况,我怀疑你真的需要阅读 Worker 的文件
更新:
简短的摘要:
直接从大型集群上的worker中读取文件集可能会终止namenode
在大多数情况下,直接从 Worker 那里读取单独的文件并不是必需的 . 您可以只为通配符
textFile()
方法的文件集或使用wholeTextFiles()
或binaryFiles()
方法读取文件集及其名称在具有千兆字节图像的图像处理的特定情况下,只需将它们放入序列文件并使用
sequenceFile()
方法读取它可以通过直接查询WebHDFS REST API来实现直接从HSFS使用Python直接读取而无需额外的库,这是一种过度杀伤,因为这正是库实现的 . 另一种选择可能是使用
pipe()
Spark方法调用Java程序读取HDFS文件并将它们以序列化形式返回到stdout . 另一种选择是通过转义到shell将文件从HDFS复制到临时空间,然后使用标准读取文件功能读取此文件 . 就个人而言,我会解雇我的开发人员来实现我在这里提出的任何方法