首页 文章

如何将类型<class 'pyspark.sql.types.Row'>转换为Vector

提问于
浏览
3

我是Spark的新手,目前我正在尝试使用Python编写一个简单的代码,在一组数据上执行KMeans .

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import re
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.linalg import SparseVector
from numpy import array
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

import pandas as pd
import numpy
df = pd.read_csv("/<path>/Wholesale_customers_data.csv")
sql_sc = SQLContext(sc)
cols = ["Channel", "Region", "Fresh", "Milk", "Grocery", "Frozen", "Detergents_Paper", "Delicassen"]
s_df = sql_sc.createDataFrame(df)
vectorAss = VectorAssembler(inputCols=cols, outputCol="feature")
vdf = vectorAss.transform(s_df)
km = KMeans.train(vdf, k=2, maxIterations=10, runs=10, initializationMode="k-means||")
model = kmeans.fit(vdf)
cluster = model.clusterCenters()
print(cluster)

我将这些输入到pyspark shell中,当它运行model = kmeans.fit(vdf)时,我收到以下错误:

TypeError:无法在org.apache.apache.apark.api.py.PythonRunner $$ anon $ 1的org.apache.spark.api.python.PythonRunner $$ anon $ 1.read(PythonRDD.scala:166)中将类型转换为Vector . (PythonRDD.scala:207)org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)at at位于org.apache.spark.rdd.MapPartitionsRDD.compute上的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)org.apache.spark.rdd.RDD.iterator(RDD.scala:277) MapPartitionsRDD.scala:38)atg.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)atg.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)org.apache.spark.rdd .RDD.iterator(RDD.scala:275)位于org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)atg.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)at at Org.apache.spark.rdd.RDD.iterator(RDD.scala:277)org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDd.scark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)at org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala:66)org.apache.spark.scheduler.Task.run(Task.scala:89)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:227)at java . util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)17 / 02/26 23:31:58错误执行程序:阶段23.0中的任务6.0中的异常(TID 113)org.apache.spark.api.python.PythonException:Traceback(最近一次调用最后一次):文件“/usr/hdp/2.5 .0.0-1245 / spark / python / lib / pyspark.zip / pyspark / worker.py“,第111行,主进程()文件”/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark .zip / pyspark / worker.py“,第106行,进程serializer.dump_stream(func(split_index,iterator),outfile)文件”/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark . zip / pyspark / serializers.py“,第263行,在dump_stream vs = list(itertools.islice(iterator,batch))文件”/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/ pyspark / mllib / linalg / init.py“,第77行,在_convert_to_vector中引发TypeError(”无法将类型%s转换为向量“%type(l))TypeError:无法将类型转换为Vector

我得到的数据来自:https://archive.ics.uci.edu/ml/machine-learning-databases/00292/Wholesale%20customers%20data.csv

有人可以告诉我这里出了什么问题,我错过了什么?我感谢任何帮助 .

谢谢!

更新:@Garren我得到的错误是:

我得到的错误是:>>> kmm = kmeans.fit(s_df)17/03/02 21:58:01 INFO BlockManagerInfo:删除localhost上的broadcast_1_piece0:内存中的56193(大小:5.8 KB,免费:511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner:Cleaned accumulator 5 17/03/02 21:58:01 INFO BlockManagerInfo:删除localhost上的broadcast_0_piece0:内存中的56193(大小:5.8 KB,免费:511.1 MB)17 / 03/02 21:58:01 INFO ContextCleaner:清理累加器4跟踪(最近一次调用最后一次):文件“”,第1行,在文件“/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml /pipeline.py“,第69行,在fit中返回self._fit(dataset)文件”/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py“,第133行,在_fit java_model中= self._fit_java(dataset)文件“/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py”,第130行,在_fit_java中返回self._java_obj.fit(dataset._jdf)文件“/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”,第813行,在调用文件“/usr/hdp/2.5.0.0-1245 /火花/蟒/ pyspark / SQL / utils的 . py“,第51行,在deco中引发AnalysisException(s.split(':',1)[1],stackTrace)pyspark.sql.utils.AnalysisException:u”无法解析给定输入列的'features':[Channel,Grocery ,新鲜,冷冻,Detergents_Paper,地区,Delicassen,牛奶];“

1 回答

  • 1

    仅在[即将弃用] spark mllib包中使用Spark 2.x ML包:

    from pyspark.ml.clustering import KMeans
    from pyspark.ml.feature import VectorAssembler
    df = spark.read.option("inferSchema", "true").option("header", "true").csv("whole_customers_data.csv")
    cols = df.columns
    vectorAss = VectorAssembler(inputCols=cols, outputCol="features")
    vdf = vectorAss.transform(df)
    kmeans = KMeans(k=2, maxIter=10, seed=1)
    kmm = kmeans.fit(vdf)
    kmm.clusterCenters()
    

相关问题