rdd.mapPartitions(iter=>{
//Spin up something expensive that you only want to do once per node
for(item<-iter) yield {
//do stuff to the items using your expensive item
}
})
0
如果您的数据集很小(可以加载它并在一个工作人员上训练),您可以执行以下操作:
def trainModel[T](modelId: Int, trainingSet: List[T]) = {
//trains model with modelId and returns it
}
//fake data
val data = List()
val numberOfModels = 100
val broadcastedData = sc.broadcast(data)
val trainedModels = sc.parallelize(Range(0, numberOfModels))
.map(modelId => (modelId, trainModel(modelId, broadcastedData.value)))
3 回答
实际上,我们有一个库就是这样做的 . 我们有几个sklearn转换器和预测器启动和运行 . 它的名字是sparkit-learn .
从我们的例子:
你可以找到here .
我不是100%确定我正在关注,但是有很多分区方法,例如
mapPartitions
. 这些操作符会在每个节点上为您提供Iterator
,您可以对数据执行任何操作并将其传递回新的Iterator
如果您的数据集很小(可以加载它并在一个工作人员上训练),您可以执行以下操作:
我假设你有一些模型列表(或一些参数化模型),你可以给它们ID . 然后在函数trainModel中根据id选择一个 . 因此,您将获得成对的训练模型及其ID的rdd .