这可能是一个很长的镜头,但认为它不能试图在pyspark中使用Elsevier's open-sourced spark-xml-utils package来使用XSLT转换一些XML记录 .

我通过一些探索性代码获得转换工作取得了一些成功:

# open XSLT processor from spark's jvm context
with open('/tmp/foo.xsl', 'r') as f:
    proc = sc._jvm.com.elsevier.spark_xml_utils.xslt.XSLTProcessor.getInstance(f.read())


# transform XML record with 'proc' 
with open('/tmp/bar.xml','r') as f:
    transformed = proc.transform(f.read())

但是,在更现实的情况下,我无法将 proc.transform 放入 lambda map函数中,得到的错误类似于:

“调用o55.getstate时发生错误 . 跟踪:py4j.Py4JException:py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)py4j.reflection.ReflectionEngine.getMethod上的方法getstate([])不存在ReflectionEngine.java:326)py4j.Gateway.invoke(Gateway.java:272)py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)py4j.commands.CallCommand.execute(CallCommand.java:79)at java.lang.Thread.run中的py4j.GatewayConnection.run(GatewayConnection.java:214)(Thread.java:748)“

当我得到一个小例子来处理单个记录时,那是在pyspark shell中运行的,我假设它正在使用火花驱动器 . 但是在上面提到的map函数中,这是通过Livy和YARN在Spark中引入的工作者 . 这个SO question/answer暗示我可能无法在该上下文中使用jvm中的函数 .

现在, spark-xml-utils 库在scala中提供了一些示例,正是我想做的事情:

import com.elsevier.spark_xml_utils.xslt.XSLTProcessor

val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")

val stylesheet = sc.textFile("s3n://spark-xml-utils/stylesheets/srctitle.xsl").collect.head

val srctitles = xmlKeyPair.mapPartitions(recsIter => { 
    val proc = XSLTProcessor.getInstance(stylesheet)
    recsIter.map(rec => proc.transform(rec._2))
})

我想知道,我怎样才能将其转换为pyspark代码,以便我可以在RDD上运行它?理想情况下,在具有以下输入和输出格式的RDD上:

id | document | other | columns
-----------------------------------------------------
sprog | <xml here...> | more | data
baz   | <xml here...> | more | data

可能会成为

id | document | other | columns
-----------------------------------------------------
sprog | <*transformed* xml here...> | more | data
baz   | <*transformed* xml here...> | more | data

任何帮助或建议将非常感激 .

Update 8/28/2018: 也试过贯穿 mapPartitions ,没有骰子 . __getstate__() 的错误相同