首页 文章

Pytables / Pandas:组合(读取?)按行分割的多个HDF5存储

提问于
浏览
6

在“一次编写,多次读取”工作流程中,我经常使用FastExport实用程序解析从Teradata转储的大型文本文件(20GB-60GB),并使用Pandas将它们加载到Pytables中 . 我正在使用多处理来分块文本文件并将它们分发到不同的进程,以便根据每行大约5MM的行数来编写.H5文件,以支持并行写入 . 对于并行编写多个hdf5文件大约12分钟这相当快,相比之下,为25MM行x64列编写单个hdf5文件需要22分钟 .

%timeit -n 1 write_single_hdf_multiprocess()
1 loops, best of 3: 22min 42s per loop

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 12min 12s per loop

对于按行分割多个h5文件的情况,我最终会有多个文件具有相同的结构,我希望在单个h5file根/ data / table中组合

要测试组合功能,以下是代码段:

import tables as tb
import pandas as pd

tb.setBloscMaxThreads(15)
store =pd.HDFStore('temp15.h5',complib='blosc')

filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5']

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    store.append(key='data',value=df,format='t',chunksize=200000)

store.close()

这是%timeit结果:

1 loops, best of 3: 8min 22s per loop

这基本上占用了我通过并行编写多个h5文件获得的大部分时间 . 我有两个问题:

  • 有没有办法更有效地组合(追加)具有相同表格格式的h5文件?(SQL联盟就像功能一样) . 我试过this SO但是无法让它附加表格 .

  • 如果没有,在大多数查询从所有列的位置中选择时,在行上拆分是否合理?我正在考虑编写一个map / combine函数,它将查看表的所有部分,以便从查询中进行选择 . Pandas select_as_multiple()函数执行此操作以基于列进行拆分 .


Update Based on Jeff's Suggestions:

在合并前文件写入过程中删除索引和压缩的很棒的调用 . 删除索引,压缩并将每个预合并文件的最大行数设置为1MM行:

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 9min 37s per loop

这比以前快2分多一点,速度和解析数据的速度差不多 . 将数据列设置为所需的字段后(在我的情况下为3):

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    dc=df.columns[1:4]
    store.append(key='data',value=df,format='t',data_columns=dc)

这比以前慢了大约2分钟: 1 loops, best of 3: 10min 23s per loop . 从上面的代码中删除压缩后,我得到 1 loops, best of 3: 8min 48s per loop (几乎与第一次尝试压缩而没有数据列索引相同) . 为了让您了解压缩的效果,未压缩存储大约为13.5GB,而使用 blosc 的压缩版本大约为3.7GB .

总之,我的进程需要 18 minutes 15 seconds 来创建一个合并的未压缩的hdf5文件 . 与单个文件写入(压缩)相比,这个速度大约为 4 minutes 7 seconds .

这让我想到了我的问题的第二部分,如果我不合并文件并使用合并前文件以 Map /组合方式处理,那么这可能是一种合理的方法吗?我应该如何考虑实施这个?

对于完整的披露,我在Pandas版本 0.12.0 ,Pytables版本 3.0.0 和我的数据处理工作流程如下(伪代码):

def generate_chunks_from_text_file(reader,chunksize=50000):
    """ generator that yields processed text chunks """

    for i, line in enumerate(reader.readlines()):
        ----process data and yield chunk -----


def data_reader(reader,queue):
    """ read data from file and put it into a queue for multiprocessing """

    for chunk in self.generate_chunks_from_text_file(reader):
        queue.put(chunk) # put data in the queue for the writer

def data_processor(queue,filename,dtype,min_size):
    """" subprocess that reads the next value in the queue and writes hdf store. """

    store=pd.HDFStore(filename)

    while True:

        results = queue.get()
        array=np.array(results,dtype=dt) # convert to numpy array
        df = pd.DataFrame(array) #covert to pandas array

        store.append(key='data', value=df, format='t', min_itemsize=dict(min_size), data_columns=[],index=False)
    store.close()
        ----when queue exhausts - break-----

1 回答

  • 7

    我做了一个非常相似的split-process-combine方法,使用多个进程创建中间文件,然后使用单个进程合并生成的文件 . 以下是获得更好性能的一些提示:

    • 通过传递 index=False 来编写文件时关闭索引,请参阅文档的here . 我相信 PyTables 会逐步更新索引,在这种情况下是完全不必要的(因为您将在之后合并它们) . 仅索引最终文件 . 这应该会加快写作速度 .

    • 您可以考虑更改默认索引方案/级别,具体取决于您的查询(假设您遵循以下几点建议而不创建太多数据列) .

    • 与此类似,在编写预合并文件时不要创建压缩文件,而是在写入索引文件后(在未压缩状态下)创建它,因此这最终成为您的最后一步 . 请参阅文档here . 此外,在使用重新计算PyTables块的 ptrepack 时传递 --chunkshape=auto 非常重要(例如,在一个块中读取/写入多少数据),因为它将考虑整个表 .

    • RE压缩,YMMV可能会有所不同,具体取决于您的数据实际压缩的程度以及您正在进行的查询类型 . 我有一些类型的数据,我发现它根本不会压缩更快,即使理论上它应该更好 . 你必须进行实验(尽管我总是使用 blosc ) . Blosc仅具有 one 压缩级别(对于1-9级或关闭,它可以打开等级0) . 所以改变这一点不会改变任何事情 .

    • 我以索引顺序合并文件,基本上是通过将预合并文件的子集读入内存(常数只使用恒定数量的内存),然后将它们逐个附加到最终文件中 . (不是100%肯定这有所不同,但似乎运作良好) .

    • 你会发现你的绝大部分时间都花在了索引上 .

    • 此外,只索引您实际需要的列!通过确保在写入每个文件时指定 data_columns=a_small_subset_of_columns .

    • 我发现编写很多小文件比较好,然后合并创建一个较大的文件,而不是写几个大文件,而是YMMV . (例如说100个100MB的合并前文件产生10GB文件,而不是5个2GB文件) . 虽然这可能是我的处理管道的一个功能,因为我倾向于处理处理而不是实际写入 .

    • 我没有使用过,但听到使用SSD(售卖状态驱动器)的惊人事情,即使它对于这种事情来说相对较小 . 您可以使用一个加速度获得一个数量级的速度(压缩可能会改变此结果) .

相关问题