首页 文章

使用boto3 lib和AWS Lambda从位于S3存储桶中的压缩文件中获取数据流

提问于
浏览
6

我正在尝试为我的chron作业创建一个无服务器处理器 . 在这个作业中,我从我的一个客户端收到我的S3存储桶中的压缩文件,文件大小约为 50MB 但是一旦你解压缩它,它就变成 1.5GB 了由于我无法从S3存储桶下载此文件并将其解压缩到我的Lambda上,因此AWS Lambda上可用空间的硬限制是 500MB ,我已成功解压缩文件并使用 funzip 逐行流式传输内容在unix脚本中 .

for x in $files ; do echo -n "$x: " ; timeout 5 aws s3 cp $monkeydir/$x - | funzip

我的名字: MonkeyBusiness 密钥: /Daily/Business/Banana/{current-date} 对象: banana.zip

但是现在因为我试图使用boto3实现相同的输出,我如何将压缩内容流式传输到sys i / o并解压缩流将内容保存在单独的文件中,每行除以10000行,然后将分块文件上传回S3 . 需要指导,因为我是AWS和boto3的新手 .

如果您需要有关该工作的更多详细信息,请与我们联系 .

下面给出的建议解决方案在这里不适用,因为zlib文档明确指出所述lib与gzip文件格式兼容,我的问题是zip文件格式 .

import zlib

def stream_gzip_decompress(stream):
    dec = zlib.decompressobj(32 + zlib.MAX_WBITS)  # offset 32 to skip the header
    for chunk in stream:
        rv = dec.decompress(chunk)
        if rv:
            yield rv

2 回答

  • 3

    所以我使用BytesIO将压缩文件读入缓冲区对象,然后我使用zipfile打开解压缩的流作为未压缩的数据,我能够逐行获取基准 .

    import io
    import zipfile
    import boto3
    import sys
    
    s3 = boto3.resource('s3', 'us-east-1')
    
    
    def stream_zip_file():
        count = 0
        obj = s3.Object(
            bucket_name='MonkeyBusiness',
            key='/Daily/Business/Banana/{current-date}/banana.zip'
        )
        buffer = io.BytesIO(obj.get()["Body"].read())
        print (buffer)
        z = zipfile.ZipFile(buffer)
        foo2 = z.open(z.infolist()[0])
        print(sys.getsizeof(foo2))
        line_counter = 0
        for _ in foo2:
            line_counter += 1
        print (line_counter)
        z.close()
    
    
    if __name__ == '__main__':
        stream_zip_file()
    
  • 1

    这不是确切的答案 . 但你可以尝试一下 .

    首先,请调整answer that mentioned about gzip file with limited memory,这个方法允许一个按块传输文件块 . 并且boto3 S3 put_object()和upload_fileobj似乎允许流式传输 .

    您需要使用以下解压缩来混合和调整上述代码 .

    stream = cStringIO.StringIO()
    stream.write(s3_data)
    stream.seek(0)
    blocksize = 1 << 16  #64kb
    with gzip.GzipFile(fileobj=stream) as decompressor:
        while True:
            boto3.client.upload_fileobj(decompressor.read(blocksize), "bucket", "key")
    

    我无法保证上面的代码能够正常工作,它只是给你一个解压缩文件并通过块重新上传它的想法 . 您甚至可能需要将解压缩数据传输到ByteIo并将管道传递给upload_fileobj . 有很多测试 .

    如果您不需要尽快解压缩文件,我的建议是使用lambda将文件放入SQS队列 . 当有“足够”的文件时,触发将读取队列并处理文件的SPOT实例(这将非常便宜) .

相关问题