首页 文章

Apache Spark:在分区上应用sklearn parallel中的函数

提问于
浏览
3

我是Big Data和Apache Spark的新手(以及在主管下工作的本科生) .

是否可以将函数(即样条曲线)仅应用于RDD的分区?我正在努力实现论文here中的一些工作 .

“学习星火”这本书似乎表明这是可能的,但并没有解释如何 .

“如果您有许多小型数据集,您希望在其上训练不同的学习模型,那么最好在每个节点上使用单节点学习库(例如,Weka或SciKit-Learn),也许并行调用它使用Spark map()的节点 . “

3 回答

  • 3

    实际上,我们有一个库就是这样做的 . 我们有几个sklearn转换器和预测器启动和运行 . 它的名字是sparkit-learn .
    从我们的例子:

    from splearn.rdd import DictRDD  
    from splearn.feature_extraction.text import SparkHashingVectorizer  
    from splearn.feature_extraction.text import SparkTfidfTransformer  
    from splearn.svm import SparkLinearSVC  
    from splearn.pipeline import SparkPipeline  
    
    from sklearn.feature_extraction.text import HashingVectorizer  
    from sklearn.feature_extraction.text import TfidfTransformer  
    from sklearn.svm import LinearSVC  
    from sklearn.pipeline import Pipeline  
    
    X = [...]  # list of texts  
    y = [...]  # list of labels  
    X_rdd = sc.parallelize(X, 4)
    y_rdd = sc.parralelize(y, 4)
    Z = DictRDD((X_rdd, y_rdd),
                columns=('X', 'y'),
                dtype=[np.ndarray, np.ndarray])
    
    local_pipeline = Pipeline((
        ('vect', HashingVectorizer()),
        ('tfidf', TfidfTransformer()),
        ('clf', LinearSVC())
    ))
    dist_pipeline = SparkPipeline((
        ('vect', SparkHashingVectorizer()),
        ('tfidf', SparkTfidfTransformer()),
        ('clf', SparkLinearSVC())
    ))
    
    local_pipeline.fit(X, y)
    dist_pipeline.fit(Z, clf__classes=np.unique(y))
    
    y_pred_local = local_pipeline.predict(X)
    y_pred_dist = dist_pipeline.predict(Z[:, 'X'])
    

    你可以找到here .

  • 0

    我不是100%确定我正在关注,但是有很多分区方法,例如 mapPartitions . 这些操作符会在每个节点上为您提供 Iterator ,您可以对数据执行任何操作并将其传递回新的 Iterator

    rdd.mapPartitions(iter=>{
      //Spin up something expensive that you only want to do once per node
      for(item<-iter) yield {
        //do stuff to the items using your expensive item
      }
    })
    
  • 0

    如果您的数据集很小(可以加载它并在一个工作人员上训练),您可以执行以下操作:

    def trainModel[T](modelId: Int, trainingSet: List[T]) = {
      //trains model with modelId and returns it
    }
    
    //fake data
    val data = List()
    val numberOfModels = 100
    val broadcastedData = sc.broadcast(data)
    val trainedModels = sc.parallelize(Range(0, numberOfModels))
      .map(modelId => (modelId, trainModel(modelId, broadcastedData.value)))
    

    我假设你有一些模型列表(或一些参数化模型),你可以给它们ID . 然后在函数trainModel中根据id选择一个 . 因此,您将获得成对的训练模型及其ID的rdd .

相关问题