首页 文章

如何在Estimator训练期间动态加载数据集的新部分?

提问于
浏览
3

我有一个有趣的问题 .

我正在使用 tf.Estimator 对大型数据集(15M行,16列)进行回归,并使用常用方法将数据加载到_1770710中:

def input_fn_train(features, labels, batch_size, repeat_count):

    dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
    dataset = dataset.shuffle(len(labels)).repeat(repeat_count).batch(batch_size)
    return dataset

featureslabels 是pandas DataFrames . input_fn 适用于较小的数据(最多可达数百万行),但在包含整个数据集时,它会引发:

[libprotobuf FATAL external/protobuf_archive/src/google/protobuf/message_lite.cc:68] CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization. terminate called after throwing an instance of 'google::protobuf::FatalException' what(): CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization.

导致此错误的原因是,在显式数据(而非占位符)上调用 .from_tensor_slices() 时,TensorFlow会为每个数据点创建 tf.constant() . TensorFlow中图形的大小存在固有的局限性,而且我的数据太大了 .

在tensorflow文档中,他们提到了这一点,并提到了解决这个问题的方法:

“作为替代方案,您可以根据 tf.placeholder() 张量定义数据集,并在数据集上初始化迭代器时提供NumPy数组 . ”

这个方法可以解决我的问题,但问题在于初始化,事实上,我无法访问它 . 在运行数据集迭代器的初始化操作时,无法将实际值提供给占位符 .

使用以下钩子在 tf.Estimator 内初始化数据集:

class _DatasetInitializerHook(training.SessionRunHook):

    def __init__(self, iterator):
        self._iterator = iterator

    def begin(self):
        self._initializer = self._iterator.initializer

    def after_create_session(self, session, coord):
        del coord
        session.run(self._initializer)

如您所见,它在创建会话后立即被调用 . 问题是初始化程序会话运行独立于所有挂钩,因此在初始化会话运行时不会调用挂钩,因此,无法通过 feed_dict 来填充占位符 .

我无法自己初始化迭代器,因为没有办法将迭代器传递给 Estimator . 迭代器在之后被初始化

解决这个问题的方法是明确地将我的数据分成 TFRecord 文件并使用TensorFlow函数直接加载它们,但是,这是一个非常不受欢迎的解决方案 . 在我公司的代码库中,我们拥有自己的优化二进制数据格式,使用其他文件会占用大量空间和IO事务时间,这一点至关重要 .

我认为我的问题有多种解决方案,但是,我仍然没有提出任何解决方案 . 如果您有任何想法或建议,如何做到这一点,请分享,谢谢!

1 回答

  • 2

    好的,我找到了解决问题的方法 . 可以使用 Dataset.from_generator() 函数完成 . 我的解决方案使用一个生成器生成DataFrames,第二个生成行,同时迭代这些DataFrame .

    a = arange(20).reshape(10,2)
    df = DataFrame(a, columns=['x1','y1'])
    
    
    def gen_partition():
        for i in range(2):
            df_partition = df.iloc[i * 5 : (i + 1) * 5]
            yield df_partition
    
    
    def gen_fields():
        for partition in gen_partition(): # type: DataFrame
            for row in partition.itertuples():
                yield {'x1': row[1]}, row[2]
    
    
    def input_fn_gen():
        dataset = Dataset.from_generator(
            gen_fields,
            ({'x1': tf.float32}, tf.float32),
            ({'x1': tf.TensorShape([])}, tf.TensorShape([])))
    
        dataset = dataset.shuffle(20).repeat(20).batch(2).prefetch(1)
        return dataset
    
    
    feature_columns = [tf.feature_column.numeric_column('x1')]
    
    dir = get_model_dir('linreg_test')
    
    tf.logging.set_verbosity('INFO')
    
    estimator = tf.estimator.LinearRegressor(
        feature_columns=feature_columns,
        model_dir=dir,
        label_dimension=1
    )
    
    estimator.train(input_fn=lambda: input_fn_gen())
    

相关问题