我正在关注"Learning PySpark"教程(在this link) . 当我跑

selector = ft.ChiSqSelector(4).fit(births_train)

s1 = births_train.map(lambda row: row.label)
s2 = selector.transform(births_train.map(lambda row: row.features))

print(s1.take(1))
print(s2.take(1))
print(type(s1))
print(type(s2))

我有这个输出:

[0.0] [DenseVector([0.0,99.0,99.0,999.0])] <class'pyspark.rdd.PipelinedRDD'> <class'pyspark.rdd.RDD'>

当我尝试将结果与zip合并时,就像教程建议的那样:

s3=s1.zip(s2)
print(type(s3))
print(s3.collect())

我收到此错误:

<class'pyspark.rdd.RDD'> --------------------------------------- ------------------------------------ Py4JJavaError Traceback(最近一次调用last)in()1 s3 = s1.zip(s2)2 print(type(s3))----> 3 print(s3.collect())/ content/spark-2.3.1-bin-hadoop2.7/python/pyspark/rdd.py在collect(self)832“”“833与SCCallSiteSync(self.context)作为css: - > 834 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())835返回列表(_load_from_socket( sock_info,self._jrdd_deserializer))836 /content/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self,* args) 1255 answer = self.gateway_client.send_command(command)1256 return_value = get_return_value( - > 1257 answer,self.gateway_client,self.target_id,self.name)1258 1259 temp_arg in temp_args:/content/spark-2.3.1-bin -hadoop2.7 / python / pyspark / sql / utils.py in deco(* a,** kw)61 def deco(* a,** kw):62 try:---> 63 return f(* a, ** kw)64除py4j.protocol.Py4JJavaError外为e:65 s = e.jav get_return_value中的a_exception.toString()/ content/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py(answer,gateway_client,target_id,name) 326引发Py4JJavaError(327“调用{0} {1} {2}时发生错误 . \ n” . - > 328格式(target_id,“ . ”,名称),值)329 else:330引发Py4JError(Py4JJavaError:调用z时发生错误:org.apache.spark.api.python.PythonRDD.collectAndServe . :org . apache.spark.SparkException:作业因阶段失败而中止:阶段308.0中的任务0失败1次,最近失败:阶段308.0中失去的任务0.0(TID 8596,localhost, Actuator 驱动程序):org.apache.spark.api . python.PythonException:Traceback(最近一次调用最后一次):文件“/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第230行,主进程()文件“/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第225行,在进程serializer.dump_stream(func(split_index,iterator)中, outfile)文件“/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,第324行,在dump_stream中self.serializer.dump_stream(self._batched(iterator) ),stream)文件“/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,l ine 139,在dump_stream中用于迭代器中的obj:文件“/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”,第313行,在_batched中用于项目迭代器:文件“”,第1行,在文件“/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第75行,作为回报lambda * a :f(* a)文件“/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/util.py”,第55行,在包装器返回f(* args,** kwargs)文件“” ,第9行,重新编码KeyError:'1'

为什么?