我想在Spark中对MongoDB提供的数据运行k-means . 我有一个针对flatfile的工作示例:
sc = SparkContext(appName="KMeansExample") # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
这是flatfile的格式是:
0 0 1
1 1 1
2 2 2
9 9 6
现在我想用MongoDB替换flatfile:
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()
# <<<< Here I am missing the parsing >>>>>
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
I like to understand how to map data from the df so that it can be used as input for kmeans.
数据库的"layout"是:
根
| - _id:string(nullable = true)
| - field0:binary(nullable = true)
| - field1:binary(nullable = true)
| - field2:binary(nullable = true)
| - field3:binary(nullable = true)
| - field4:binary(nullable = true)
| - field5:binary(nullable = true)
| - field6:binary(nullable = true)
| - field7:binary(nullable = true)
| - field8:binary(nullable = true)
| - field9:binary(nullable = true)
1 回答
基于你的片段,我假设你正在使用PySpark .
如果您查看clustering.KMeans Python API文档,您可以看到第一个参数需要
RDD of Vector or convertible sequence types
执行下面的代码后,使用MongoDB Spark Connector从MongoDB加载数据
您在
df
中拥有的是一个DataFrame,因此我们需要将其转换为可转换为Vector类型的内容 .由于您在文本文件示例中使用numpy.array,我们可以继续使用此数组类型进行学习转换 .
根据提供的
layout
,首先我们需要删除_id
列,因为群集训练不需要它 . 有关更多信息,另请参见Vector数据类型 .有了上述信息,让我们进入它:
如果要保留布尔值(True / False)而不是整数(1/0),则可以删除
int
部分 . 如下:将所有这些放在一起: