首页 文章

在多处理中使用keras

提问于
浏览
1

这基本上是一个副本:Keras + Tensorflow and Multiprocessing in Python但我的设置有点不同,他们的解决方案对我不起作用 .

我需要针对从另一个模型做出的预测来训练keras模型 . 预测与一些CPU繁重的代码相关联,因此我希望将它们并行化并让代码在工作进程中运行 . 这是我想要执行的代码:

import numpy as np

from keras.layers import Input, Dense
from keras.models import Model
from keras.optimizers import Adam

def create_model():
    input_layer = Input((10,))
    dense = Dense(10)(input_layer)

    return Model(inputs=input_layer, outputs=dense)

model_outside = create_model()
model_outside.compile(Adam(1e-3), "mse")

def subprocess_routine(weights):
    model_inside = create_model()
    model_inside.set_weights(weights)

    while True:
        # lots of CPU
        batch = np.random.rand(10, 10)
        prediction = model_inside.predict(batch)

        yield batch, prediction

weights = model_outside.get_weights()

model_outside.fit_generator(subprocess_routine(weights),
                            epochs=10,
                            steps_per_epoch=100,
                            use_multiprocessing=True,
                            workers=1)

这会产生错误

E tensorflow / core / grappler / clusters / utils.cc:81]无法获取设备属性,错误代码:3

我找到了上面的问题,答案是将keras导入到子进程中 . 我已将所有导入添加到 subprocess_routine 中 . 但这不会改变错误 . 可能有必要从主进程中完全消除keras导入,但在我的设置中,这将意味着巨大的重构 .

Keras多线程似乎有效 . 在本期中,向下滚动到最后一条注释:https://github.com/keras-team/keras/issues/5640在我的代码中,它看起来像这样:

model_inside = create_model()
model_inside._make_predict_function()

graph = tf.get_default_graph()

def subprocess_routine(model_inside, graph):

    while True:
        batch = np.random.rand(10, 10)

        with graph.as_default():
            prediction = model_inside.predict(batch)

        yield batch, prediction

model_outside.fit_generator(subprocess_routine(model_inside, graph),
                            epochs=10,
                            steps_per_epoch=100,
                            use_multiprocessing=True,
                            workers=1)

但错误信息是相同的 .

由于问题显然与子进程的初始化有关,我试图在每个子进程中创建一个新的会话:

def subprocess_routine(weights):

    import keras.backend as K
    import tensorflow as tf
    sess = tf.Session()
    K.set_session(sess)

    model_inside = create_model()
    model_inside.set_weights(weights)

    while True:
        batch = np.random.rand(10, 10)
        prediction = model_inside.predict(batch)

        yield batch, prediction

它会在同一错误消息上生成变体:

E tensorflow / stream_executor / cuda / cuda_driver.cc:1300]无法检索CUDA设备计数:CUDA_ERROR_NOT_INITIALIZED

再次,初始化似乎被打破了 .

如何在我的主进程和多处理生成的子进程中运行keras?

1 回答

  • 1

    好消息是tensorflow会话是线程安全的:Is it thread-safe when using tf.Session in inference service?

    要在多个进程中使用keras模型,您必须执行以下操作:

    • 设置模型

    • 来电 _make_predict_function()

    • 设置会话并使用它来获取张量流图

    • 完成此图表

    • 每次预测某事时,请提供此图 as_default_graph()

    以下是一些示例代码:

    # the usual imports
    import numpy as np
    import tensorflow as tf
    
    from keras.models import *
    from keras.layers import *
    
    # set up the model
    i = Input(shape=(10,))
    b = Dense(1)(i)
    model = Model(inputs=i, outputs=b)
    
    # now to use it in multiprocessing, the following is necessary
    model._make_predict_function()
    sess = tf.Session()
    sess.run(tf.global_variables_initializer())
    default_graph = tf.get_default_graph()
    default_graph.finalize()
    
    # now you share the model and graph between processes
    # in each process you can call this:
    with default_graph.as_default():
        return model.predict(something)
    

相关问题