我正在尝试使用pyspark加入不同的数据集(hadoop导出文件),然后调用一个自定义函数,该函数将pandas数据帧转换为xml文件

以下是我正在做的事情:

def get_schema(header,col_num,file_name):
    fields = [StructField(field_name,StringType(),True) for field_name in header]
    schema = StructType(fields)
    new_file = sc.textFile(file_name)
    temp = new_file.map(lambda k: k.split("\x01")).map(lambda p : [eval("p["+str(ent)+"]") for ent in col_num])
    df = sqlContext.createDataFrame(temp, schema)
    return df

pow_header = [a,b,c]
chan_header = [a,b,d,e,f]
df_pow = get_schema(pow_header,[0,1,3],"pow_sample")
df_chan = get_schema(chan_header,[0,3,5,6,7],"chan_sample")
df_pow.registerTempTable("power")
df_chan.registerTempTable("chan")
query = "select a,b,c,NULL as d,NULL as e, NULL as f from power p UNION ALL\
         select a,b,NULL as c, d,e,f from tune t"
result = sqlContext.sql(query)
test = result.sort(a,b).rdd
data = test.flatMap(lambda x: my_func(x,dt)).collect()

我在这里尝试做的是我已经加入的数据集我需要写出xml文件,但这是使用我的自定义函数my_func完成的 .

我无法做到这一点,我得到以下错误:

引起:org.apache.spark.api.python.PythonException:Traceback(最近一次调用最后一次):文件“/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py”,第98行,在main command = pickleSer._read_with_length(infile)文件“/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py”,第164行,在_read_with_length中返回self.loads(obj)文件“/ usr / local / spark / python / lib / pyspark.zip / pyspark / serializers.py“,第422行,在load中返回pickle.loads(obj)ImportError:在org.apache.spark.api.python中没有名为my_func的模块 . PythonRunner $$匿名$ 1.read(PythonRDD.scala:166)
在org.apache.spark.api.python.PythonRunner $$ anon $ 1. <init>(PythonRDD.scala:207)
在org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
在org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
在org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
在org.apache.spark.scheduler.Task.run(Task.scala:88)
在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)
......还有1个