首页 文章

使用read()方法从Amazon S3读取大型JSON文件时的MemoryError

提问于
浏览
1

我正在尝试使用Python将大量的JSON FILE从Amazon S3导入AWS RDS-PostgreSQL . 但是,这些错误发生了,

回溯(最近一次调用最后一次):文件“my_code.py”,第67行,在file_content = obj ['Body'] . read() . decode('utf-8') . splitlines(True)File“/ home /user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/response.py“,第76行,读取chunk = self._raw_stream.read(amt) )文件“/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/response.py”,line 239,在读取数据= self._fp.read()文件“/usr/lib64/python3.6/http/client.py”,第462行,读取s = self._safe_read(self.length)文件“/ usr /lib64/python3.6/http/client.py“,第617行,在_safe_read中返回b”“ . join(s)MemoryError

// my_code.py

import sys
import boto3
import psycopg2
import zipfile
import io
import json

s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()

bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)

def insert_query(data):
    query = """
        INSERT INTO data_table
        SELECT
            (src.test->>'url')::varchar, (src.test->>'id')::bigint,
            (src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
        FROM (SELECT CAST(%s AS JSONB) AS test) src
    """
    cursor.execute(query, (json.dumps(data),))


if key.endswith('.zip'):
    zip_files = obj['Body'].read()
    with io.BytesIO(zip_files) as zf:
        zf.seek(0)
        with zipfile.ZipFile(zf, mode='r') as z:
            for filename in z.namelist():
                with z.open(filename) as f:
                    for line in f:
                        insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
    file_content = obj['Body'].read().decode('utf-8').splitlines(True)
    for line in file_content:
        insert_query(json.loads(line))


connection.commit()
connection.close()

这些问题有解决方案吗?任何帮助都可以,非常感谢你!

1 回答

  • 2

    通过避免将整个输入文件作为一条线路输入内存,可以节省大量成本 .

    具体来说,这些行在内存使用方面非常糟糕,因为它们涉及整个文件大小的 bytes 对象的峰值内存使用量,以及带有文件完整内容的 list 行:

    file_content = obj['Body'].read().decode('utf-8').splitlines(True)
    for line in file_content:
    

    对于具有500万行的1 GB ASCII文本文件,在64位Python 3.3上,对于 bytes 对象, listlist 中的单个 str ,这是大约2.3 GB的峰值内存要求 . 一个程序需要的内存量是其处理文件大小的2.3倍,不能扩展到大文件 .

    要修复,请将原始代码更改为:

    file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
    for line in file_content:
    

    鉴于obj['Body'] appears to be usable for lazy streaming这应该从内存中删除完整文件数据的两个副本 . 使用 TextIOWrapper 意味着 obj['Body'] 被懒散地读取并以块(一次几KB)解码,并且这些行也是懒惰地迭代;无论文件大小如何,这都会将内存需求降低到很小的固定数量(峰值内存成本将取决于最长行的长度) .

    Update:

    看起来 StreamingBody 没有实现 io.BufferedIOBase ABC . 它确实有its own documented API,它可以用于类似的目的 . 如果你不能让 TextIOWrapper 为你做的工作(如果可以使它更有效和简单),另一种方法是:

    file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
    for line in file_content:
    

    与使用 TextIOWrapper 不同,它不会受益于块的批量解码(每条线都是单独解码的),但除此之外,它还应该在减少内存使用方面实现相同的好处 .

相关问题