首页 文章

如何在python中使用pyarrow从S3读取分区的镶木地板文件

提问于
浏览
11

我正在寻找使用python从s3读取多个分区目录数据的方法 .

data_folder / serial_number = 1 / cur_date = 20-12-2012 / abcdsd0324324.snappy.parquet data_folder / serial_number = 2 / cur_date = 27-12-2012 / asdsdfsd0324324.snappy.parquet

pyarrow的ParquetDataset模块具有从分区读取的能力 . 所以我尝试了以下代码:

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)

它引发了以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
    .format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

根据pyarrow的文档,我尝试使用s3fs作为文件系统,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

这会引发以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
    if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'

我只能使用ECS集群,因此 spark/pyspark is not an option .

有没有办法我们可以轻松地从s3中的这些分区目录中的python中轻松读取镶木地板文件?我觉得列出所有目录,然后阅读这不是一个很好的做法,如link所示 . 我需要将读取数据转换为pandas数据帧以进行进一步处理,因此更喜欢与fastparquet或pyarrow相关的选项 . 我也对python中的其他选项持开放态度 .

3 回答

  • 7

    我设法使用最新版本的fastparquet和s3fs . 以下是相同的代码:

    import s3fs
    import fastparquet as fp
    s3 = s3fs.S3FileSystem()
    fs = s3fs.core.S3FileSystem()
    
    #mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
    s3_path = "mybucket/data_folder/*/*/*.parquet"
    all_paths_from_s3 = fs.glob(path=s3_path)
    
    myopen = s3.open
    #use s3fs as the filesystem
    fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
    #convert to pandas dataframe
    df = fp_obj.to_pandas()
    

    通过我们conversation向马丁指出我正确的方向

    NB :基于benchmark,这比使用pyarrow要慢 . 一旦通过ARROW-1213在pyarrow中实现s3fs支持,我将更新我的答案

    我对单个迭代进行了快速基准测试,并将pyarrow和文件列表作为glob发送到fastparquet . 对于s3fs vs pyarrow,fastparquet更快,我的hackish代码 . 但我认为pyarrow s3fs一旦实现就会更快 .

    代码和基准如下:

    >>> def test_pq():
    ...     for current_file in list_parquet_files:
    ...         f = fs.open(current_file)
    ...         df = pq.read_table(f).to_pandas()
    ...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
    ...         #probably not the best way to split :)
    ...         elements_list=current_file.split('/')
    ...         for item in elements_list:
    ...             if item.find(date_partition) != -1:
    ...                 current_date = item.split('=')[1]
    ...             elif item.find(dma_partition) != -1:
    ...                 current_dma = item.split('=')[1]
    ...         df['serial_number'] = current_dma
    ...         df['cur_date'] = current_date
    ...         list_.append(df)
    ...     frame = pd.concat(list_)
    ...
    >>> timeit.timeit('test_pq()',number =10,globals=globals())
    12.078817503992468
    
    >>> def test_fp():
    ...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
    ...     df = fp_obj.to_pandas()
    
    >>> timeit.timeit('test_fp',number =10,globals=globals())
    2.0100269466638565e-06
    

    参考:

  • 1

    我们来讨论https://issues.apache.org/jira/browse/ARROW-1213https://issues.apache.org/jira/browse/ARROW-1119 . 我们必须添加一些代码以允许pyarrow识别s3fs文件系统并添加一个shim /兼容类以符合S3FS 's slightly different filesystem API to pyarrow' .

  • 2

    此问题已在2017年的this pull request中得到解决 .

    对于那些只想使用pyarrow从S3读取实木复合地板的人,这里有一个例子:

    import s3fs
    import pyarrow.parquet as pq
    from pyarrow.filesystem import S3FSWrapper
    
    fs = s3fs.S3FileSystem()
    bucket = "your-bucket"
    path = "your-path"
    
    # Python 3.6 or later
    p_dataset = pq.ParquetDataset(
        f"s3://{bucket}/{path}",
        filesystem=fs
    )
    df = p_dataset.read().to_pandas()
    
    # Pre-python 3.6
    p_dataset = pq.ParquetDataset(
        "s3://{0}/{1}".format(bucket, path),
        filesystem=fs
    )
    df = p_dataset.read().to_pandas()
    

相关问题