上下文

Keras模型(link here,为了MWE)需要并行地预测大量测试数据 .

我将一个多维数据集定义为 uintnumpy.ndarray . 其每个垂直切片是一列,其中npixels = 128高度,nbins = 128深度 .

每个预测都会转换去噪列(相同大小)中的列 .

我提供了三种方法:单线程,多处理和 pathos 包多处理 . 两种多线程方法都不起作用,我没理由 .

代码

import keras
import numpy as np
import threading
import pathos.multiprocessing
import multiprocessing


def __res_sum_squares(y_true, y_pred):
    squared_diff = (y_true - y_pred) ** 2
    return keras.backend.sum(squared_diff)


__npixels, __nbins = 128, 128
__shape_col = (__npixels, __nbins)
__shape_nn = (1, __npixels, __nbins, 1)
__model = keras.models.load_model('./model.h5', compile=True, custom_objects={'res_sum_squares': __res_sum_squares})

__max_parallel_predictions = 4
__sema = threading.BoundedSemaphore(value=__max_parallel_predictions)


def __mt_pathos_manager(col_ratio):
    return __denoise(col_ratio[0], col_ratio[1])


def __denoise_frame_mt_pathos(frame_ratios):
    results = pathos.multiprocessing.ProcessingPool().map(__mt_pathos_manager, frame_ratios)
    return results


def __denoise_frame_mt_multiprocessing(frame_ratios):
    pool = multiprocessing.Pool()
    results = pool.map(__denoise, map(lambda col_ratio: col_ratio, frame_ratios))
    pool.close()
    pool.join()
    return results


def __denoise(col, ratio=None):
    """
        :param col: the source column
        :param ratio: logging purposes
        :return: the denoised column
    """
    really_predict = True
    if type(col) is tuple:
        col, ratio = col[0], col[1]
    col_denoise = np.reshape(col, __shape_nn)

    print("{} acquiring".format(ratio))
    __sema.acquire()
    print("{} acquired".format(ratio))
    #  ~    ~  ~  ~  ~  ~  ~  ~  ~  ~ CRITICAL SECTION START ~  ~  ~  ~  ~  ~  ~  ~  ~  ~
    col_denoise = __model.predict(col_denoise) if really_predict else col_denoise
    #  ~    ~  ~  ~  ~  ~  ~  ~  ~  ~ CRITICAL SECTION END   ~  ~  ~  ~  ~  ~  ~  ~  ~  ~
    print("{} releasing".format(ratio))
    __sema.release()
    print("{} released".format(ratio))

    return np.reshape(col_denoise, __shape_col)


def denoise_cube(cube, mp=False, mp_pathos=False):
    """
        :param cube: a numpy 3D array of ncols * npixels * nbins
        :param mp: use multiprocessing
        :param mp_pathos: use pathos multiprocessing
        :return: the denoised cube
    """
    ncol = cube.shape[0]
    ratios = [(ic * 100.0) / ncol for ic in range(0, ncol)]
    frame_ratios = zip(cube, ratios)

    if mp:
        if mp_pathos:
            l_cols_denoised = __denoise_frame_mt_pathos(frame_ratios)
        else:
            l_cols_denoised = __denoise_frame_mt_multiprocessing(frame_ratios)
    else:
        l_cols_denoised = [__denoise(col, ratio) for col, ratio in frame_ratios]
    return l_cols_denoised


if __name__ == "__main__":

    test_cube = np.random.rand(1000, __npixels, __nbins)

    # Single threaded impl: works fine
    denoise_cube(test_cube, mp=False)
    # Multiprocessing Pool: blocks at the eighth "acquired" print
    denoise_cube(test_cube, mp=True, mp_pathos=False)
    # Pathos multiprocessing Pool: blocks at the eighth "acquired" print
    denoise_cube(test_cube, mp=True, mp_pathos=True)

分析

我猜测的第一件事是,在8次调用(=测试机器上的cpu核心数)后,以某种方式急于 __model.predict() 阻塞 . 所以我放置了一个少于8次访问的 threading.BoundedSemaphore . 什么都行不通 .

单线程按预期工作:

0.0 acquiring
0.0 acquired
0.0 releasing
0.0 released
< ............ >
99.9 acquiring
99.9 acquired
99.9 releasing
99.9 released

多处理(两个版本)都没有 .

0.0 acquiring
0.0 acquired
3.2 acquiring
3.2 acquired
6.4 acquiring
6.4 acquired
9.6 acquiring
9.6 acquired
12.8 acquiring
12.8 acquired
16.0 acquiring
16.0 acquired
19.2 acquiring
19.2 acquired
22.4 acquiring
22.4 acquired
< hangs >

等等, release 在哪里打印?似乎没有触及信号量,或者正在为每个呼叫复制信号量,并且始终重新初始化 . 嗯 .

所以让我们寻找 really_predict = True 并交换它的值: predict() 呼叫永远不会以这种方式达成 .

....这很好用,太棒了!所以问题不能完全解决 multiprocessing ,而是 keras 预测和 multiprocessing 汇集之间的奇怪联系 . 有什么建议?