我是pyspark的新手,我正在尝试将RDD行读取并合并为一行 .
假设我有以下文本文件:
A1 B1 C1
A2 B2 C2 D3
A3 X1 YY1
DELIMITER_ROW
Z1 B1 C1 Z4
X2 V2 XC2 D3
DELIMITER_ROW
T1 R1
M2 MB2 NC2
S3 BB1
AQ3 Q1 P1"
现在,我想将每个部分中出现的所有行(在DELIMITER_ROW之间)组合成一行,并返回这些合并行的列表 .
我想创建这种列表:
[[A1 B1 C1 A2 B2 C2 D3 A3 X1 YY1]
[Z1 B1 C1 Z4 X2 V2 XC2 D3]
[T1 R1 M2 MB2 NC2 S3 BB1 AQ3 Q1 P1]]
如何在使用RDD的pyspark中完成?
现在我知道如何读取文件并过滤掉分隔符行:
sc.textFile(pathToFile).filter(lambda line: DELIMITER_ROW not in line).collect()
但我不知道如何将每个部分中的行缩减/合并/组合/分组为一行 .
谢谢 .
1 回答
您可以使用
hadoopConfiguration.set
来设置分隔行然后拆分行的分隔符,而不是读取和拆分 .spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter", "DELIMITER_ROW")
希望这可以帮助!