我有一组文件 . 文件的路径保存在文件中,例如“all_files.txt” . 使用apache spark,我需要对所有文件进行操作并对结果进行处理 .
我想要做的步骤是:
-
通过阅读"all_files.txt"创建RDD
-
对于"all_files.txt"中的每一行(每行是某个文件的路径),将每个文件的内容读入单个RDD
-
然后对所有内容进行操作
这是我为此写的代码:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
这是抛出错误:
第323行,在get_return_value中py4j.protocol.Py4JError:调用o25.getnewargs时发生错误 . 跟踪:py4j.Py4JException:方法getnewargs([])在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)的py4j.Gateway上不存在 . 在py4j.commands.CallCommand.exe执行(CallCommand.java:79)py4j.GatewayConnection.run(GatewayConnection.java:214)的py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)上调用(Gateway.java:272) )在java.lang.Thread.run(Thread.java:745)
有人可以告诉我我做错了什么以及我应该如何进一步 . 提前致谢 .
1 回答
在
flatMap
内使用spark
或不允许在执行程序上发生任何转换(spark
会话仅在驱动程序上可用) . 也无法创建RDD的RDD(参见:Is it possible to create nested RDDs in Apache Spark?)但是你可以用另一种方式实现这种转换 - 将
all_files.txt
的所有内容读入数据帧,使用 localmap
使它们成为数据帧,并使用 localreduce
来联合所有,参见示例: