PySpark执行在每次迭代时都会减慢

我对PySpark很新 . 我写了一段片段来输入7个预测变量 . 我正在使用tfidf转换描述列来训练NB模型并预测NaN .

to_predict_cols = ['col1', 'col2', ... 'col7']
current_x = ['features_tfidf']
for iter_no, to_predict_col in enumerate(to_predict_cols):
    temp_data_imputed_level1 = data_imputed_level1.select(["_c0"] + current_x + [to_predict_col])

    # create test
    test = temp_data_imputed_level1.where(col(to_predict_col).isNull()).select(["_c0"] + current_x)

    # create train
    train = temp_data_imputed_level1.dropna()

    # create label indexing of the response
    label_stringIndexer = StringIndexer(inputCol = to_predict_col, outputCol = "label")
    stringIndexer_model = label_stringIndexer.fit(train)
    train = stringIndexer_model.transform(train)
    print("label encoding.. [OK]")

    # fit the NB classifier
    nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
    model = nb.fit(train)
    print("model building.. [OK]")

    # predict output
    predictions = model.transform(test)

    # inverse label encoding of the predictions
    labelReverse = IndexToString(inputCol='prediction', outputCol='labeled_prediction', labels=stringIndexer_model.labels)
    predictions = labelReverse.transform(predictions)
    print("model predictions.. [OK]")

我正在使用一些连接来保存我未包含的预测 . 但是当循环进入第3-4次迭代时,它从一开始就变得太慢,即连接不会影响速度 . 此外,数据在执行循环之前被缓存 .

我已经尝试了 sc._jvm.System.gc() 并删除了手动启动的所有变量 . 但仍然没有运气 .

我在一台256GB RAM和48核的服务器上 . 目前我没有连接到任何群集 . 数据大小为1.5GB .

那么,有什么想法/建议我缺少什么或如何加快这个过程?任何帮助表示赞赏 . 提前致谢 .

回答(0)