首页 文章

如何在Spark中为kmeans映射MongoDB数据?

提问于
浏览
0

我想在Spark中对MongoDB提供的数据运行k-means . 我有一个针对flatfile的工作示例:

sc = SparkContext(appName="KMeansExample")  # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

这是flatfile的格式是:

0 0 1
1 1 1
2 2 2
9 9 6

现在我想用MongoDB替换flatfile:

spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()

# <<<< Here I am missing the parsing >>>>>

clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

I like to understand how to map data from the df so that it can be used as input for kmeans.

数据库的"layout"是:

| - _id:string(nullable = true)
| - field0:binary(nullable = true)
| - field1:binary(nullable = true)
| - field2:binary(nullable = true)
| - field3:binary(nullable = true)
| - field4:binary(nullable = true)
| - field5:binary(nullable = true)
| - field6:binary(nullable = true)
| - field7:binary(nullable = true)
| - field8:binary(nullable = true)
| - field9:binary(nullable = true)

1 回答

  • 1

    我想了解如何映射来自df的数据,以便它可以用作kmeans的输入 .

    基于你的片段,我假设你正在使用PySpark .

    如果您查看clustering.KMeans Python API文档,您可以看到第一个参数需要 RDD of Vector or convertible sequence types

    执行下面的代码后,使用MongoDB Spark Connector从MongoDB加载数据

    df = spark.read.format("com.mongodb.spark.sql.DefaultSource")
                   .option("uri","mongodb://127.0.0.1/ycsb.usertable")
                   .load()
    

    您在 df 中拥有的是一个DataFrame,因此我们需要将其转换为可转换为Vector类型的内容 .

    由于您在文本文件示例中使用numpy.array,我们可以继续使用此数组类型进行学习转换 .

    根据提供的 layout ,首先我们需要删除 _id 列,因为群集训练不需要它 . 有关更多信息,另请参见Vector数据类型 .

    有了上述信息,让我们进入它:

    # Drop _id column and get RDD representation of the DataFrame
    rowRDD = df.drop("_id").rdd
    
    # Convert RDD of Row into RDD of numpy.array
    parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))
    
    # Feed into KMeans 
    clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
    

    如果要保留布尔值(True / False)而不是整数(1/0),则可以删除 int 部分 . 如下:

    parsedRdd = rowRDD.map(lambda row: array([x for x in row]))
    

    将所有这些放在一起:

    from numpy import array 
    from pyspark.mllib.clustering import KMeans
    import org.apache.spark.sql.SparkSession
    
    spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
    .config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
    .getOrCreate()
    
    df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
    
    rowRDD = df.drop("_id").rdd
    parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))
    
    clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
    clusters.clusterCenters
    

相关问题