首页 文章

pyspark - 使用自定义分隔符将文件读取到RDD?

提问于
浏览
1

我是pyspark的新手,我正在尝试将RDD行读取并合并为一行 .

假设我有以下文本文件:

A1 B1 C1
A2 B2 C2 D3
A3 X1 YY1
DELIMITER_ROW
Z1 B1 C1 Z4
X2 V2 XC2 D3
DELIMITER_ROW
T1 R1
M2 MB2 NC2
S3 BB1 
AQ3 Q1 P1"

现在,我想将每个部分中出现的所有行(在DELIMITER_ROW之间)组合成一行,并返回这些合并行的列表 .

我想创建这种列表:

[[A1 B1 C1 A2 B2 C2 D3 A3 X1 YY1]
 [Z1 B1 C1 Z4 X2 V2 XC2 D3]
 [T1 R1 M2 MB2 NC2 S3 BB1 AQ3 Q1 P1]]

如何在使用RDD的pyspark中完成?

现在我知道如何读取文件并过滤掉分隔符行:

sc.textFile(pathToFile).filter(lambda line: DELIMITER_ROW not in line).collect()

但我不知道如何将每个部分中的行缩减/合并/组合/分组为一行 .

谢谢 .

1 回答

  • 3

    您可以使用 hadoopConfiguration.set 来设置分隔行然后拆分行的分隔符,而不是读取和拆分 .

    spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter", "DELIMITER_ROW")

    希望这可以帮助!

相关问题