这个问题在这里已有答案:

我在Amazon EMR上使用PySpark,部分过程基本上是ETL步骤 .

我有一个包含多个列的数据框,其中一个名为“Report”,是一个包含一些XML的字符串 . 我需要创建第二个数据帧,其中新的字符串列将XML表示为JSON .

为此,我想使用这个似乎很好用的org.json Java实现 .

我开始像这样的PySpark会话:

pyspark --packages org.json:json:20171018

我可以像这样成功使用Java库的XML类和 toJSONObject() 函数:

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"org.json.XML")
xml = sc._gateway.jvm.XML

例如,这有效:

>>> print xml.toJSONObject('<xml>thing</xml>')
{"xml":"thing"}

我无法管理的是从.withColumn调用此函数 . 用于复制问题的示例数据帧:

columns = ['id', 'Report']
vals = [ (1, '<xml>thing1</xml>'), (2, '<xml>thing2</xml') ]
df = spark.createDataFrame(vals, columns)
df.printSchema()

得到:

root
 |-- id: long (nullable = true)
 |-- Report: string (nullable = true)

我试试这个:

df.withColumn("ReportJson", xml.toJSONObject(df["Report"])).show()

得到这个:

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1124, in __call__
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1088, in _build_args
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1075, in _get_args
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_collections.py", line 512, in convert
TypeError: 'Column' object is not callable

浏览过这个网站我尝试将其放入UDF:

from pyspark.sql import functions as F
from pyspark.sql.types import *
def myfunc(myxml):
    return xml.toJSONObject(myxml)
xml2json = F.udf(myfunc, StringType())

但最后一个命令错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1801, in udf
    return UserDefinedFunction(f, returnType)
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1760, in __init__
    self._judf = self._create_judf(name)
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1765, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1745, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 428, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 657, in dumps
    cp.dump(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 107, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib64/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 554, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 198, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 246, in save_function_tuple
    save(f_globals)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 692, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 553, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects

你可能已经猜到我是Spark / EMR的新手但是尽管有几天谷歌搜索和反复试验我无法弄清楚如何实现这一目标 .

非常感谢任何帮助!

编辑:感谢建议的链接 . 从链接“在PySpark中使用Scala UDF”,我尝试了以下内容:

>>> def udf_test(col):
...     from pyspark.sql.column import Column, _to_java_column, _to_seq
...     sc = SparkContext._active_spark_context
...     _f = sc._jvm.org.json.XML.toJSONObject().apply
...     return Column(_f(_to_seq(sc, [col], _to_java_column)))
... 
>>> xml_data = '<xml>thing</xml>'
>>> print udf_test(xml_data)

刚刚得到:

py4j.protocol.Py4JError: An error occurred while calling z:org.json.XML.toJSONObject. Trace:
py4j.Py4JException: Method toJSONObject([]) does not exist

这些变化的错误相同:

from pyspark.sql.column import Column, _to_java_column, _to_seq
jvm = sc._gateway.jvm
java_import(jvm, "org.json.XML")
def udf_f(col):
    return Column(jvm.org.json.XML.toJSONObject().apply(col))

java_import(sc._gateway.jvm,"org.json.XML")
xml = sc._gateway.jvm.XML
def udf_f(col):
    return Column(xml.toJSONObject().apply(col))

另外,因为链接“在pyspark中包装java函数”似乎暗示了CLASSPATH的问题,我下载了org.json的jar文件,并调用了Pyspark

--jars /home/hadoop/json-20171018.jar

我还编辑了/etc/spark/conf/spark-defaults.conf,将该jar路径附加到spark.driver.extraClassPath和spark.executor.extraClassPath . 这没有任何区别 .

从链接“Spark:如何使用Scala或Java用户定义函数映射Python?”,我试过:

>>> sqlContext.registerJavaFunction("x2j", "org.json.XML.toJSONObject")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'SQLContext' object has no attribute 'registerJavaFunction'

很难过 .