首页 文章

Bag.to_avro失败因为大型数据集上的Killed / MemoryError

提问于
浏览
0

我正在尝试处理大量的文本文件,这些文件由新行分隔 . 文件被gzip压缩,我将文件分成小块,未压缩的文件大约为100mb左右 . 我总共有296个单独的压缩文件,总的未压缩大小约为30Gb .

行是NQuads,我使用 Bag 将行映射为可以导入数据库的格式 . 行按键折叠,以便我可以组合与单个页面相关的行 .

这是我用来读取文件并折叠它们的代码 .

with dask.config.set(num_workers=2):
  n_quads_bag = dask.bag.\
    read_text(files)

  uri_nquads_bag = n_quads_bag.\
    map(parser.parse).\
    filter(lambda x: x is not None).\
    map(nquad_tuple_to_page_dict).\
    foldby('uri', binop=binop).\
    pluck(1).\
    map(lang_extract)

然后我'm normalizing the data into pages and entities. I'm通过map函数执行此操作,该函数将事物拆分为带有 (page, entities) 的元组 . 我正在采集数据,然后将其写入Avro中的两组独立文件 .

pages_entities_bag = uri_nquads_bag.\
      map(map_page_entities)

  pages_bag = pages_entities_bag.\
    pluck(0).\
    map(page_extractor).\
    map(extract_uri_details).\
    map(ntriples_to_dict)

  entities_bag = pages_entities_bag.\
    pluck(1) .\
    flatten().\
    map(entity_extractor).\
    map(ntriples_to_dict)

  with ProgressBar():
    pages_bag.to_avro(
      os.path.join(output_folder, 'pages.*.avro'),
      schema=page_avro_scheme,
      codec='snappy',
      compute=True)
    entities_bag.to_avro(
      os.path.join(output_folder, 'entities.*.avro'),
      schema=entities_avro_schema,
      codec='snappy',
      compute=True)

代码在 pages_bag.to_avro(... compute=True) 上失败 Killed/MemoryError . 我已经玩过减少分区大小并将处理器数量减少到2 .

我在设置 compute=True 时错了吗?这是整个数据集被带入内存的原因吗?如果是这样,我怎么能得到要写的文件?

或者是否有可能页面或实体的分区对于计算机来说太大了?

我遇到的另一个问题是我错误地使用了 Bags ,这是我想要解决的问题的正确方法吗?

机器的规格我正在运行:

  • 4 CPU

  • 16GB的Ram

  • 375 Scratch Disk

1 回答

  • 0

    让内存耗尽的方法是保持文件大约100MB未压缩并使用 groupby . 正如Dask文档所述,您可以强制它在磁盘上随机播放 . groupby 支持在输出上设置多个分区 .

    with dask.config.set(num_workers=2):
      n_quads_bag = dask.bag.\
        read_text(files)
    
      uri_nquads_bag = n_quads_bag.\
        map(parser.parse).\
        filter(lambda x: x is not None).\
        map(nquad_tuple_to_page_dict).\
        groupby(lambda x: x[3], shuffle='disk', npartitions=n_quads_bag.npartitions).\        
        map(grouped_nquads_to_dict).\
        map(lang_extract)
    

相关问题